blob: 4d5df404628698fcb644a5f21e94568dded25eb7 [file] [log] [blame]
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package azure
import (
"errors"
"fmt"
"regexp"
"sort"
"strconv"
"strings"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-10-01/compute"
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2017-09-01/network"
"github.com/Azure/go-autorest/autorest/to"
"k8s.io/klog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
cloudprovider "k8s.io/cloud-provider"
)
var (
// ErrorNotVmssInstance indicates an instance is not belongint to any vmss.
ErrorNotVmssInstance = errors.New("not a vmss instance")
scaleSetNameRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/Microsoft.Compute/virtualMachineScaleSets/(.+)/virtualMachines(?:.*)`)
resourceGroupRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(?:.*)/virtualMachines(?:.*)`)
vmssNicResourceGroupRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/Microsoft.Compute/virtualMachineScaleSets/(?:.*)/virtualMachines/(?:.*)/networkInterfaces/(?:.*)`)
vmssMachineIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachineScaleSets/%s/virtualMachines/%s"
)
// scaleSet implements VMSet interface for Azure scale set.
type scaleSet struct {
*Cloud
// availabilitySet is also required for scaleSet because some instances
// (e.g. master nodes) may not belong to any scale sets.
availabilitySet VMSet
vmssCache *timedCache
vmssVMCache *timedCache
nodeNameToScaleSetMappingCache *timedCache
availabilitySetNodesCache *timedCache
}
// newScaleSet creates a new scaleSet.
func newScaleSet(az *Cloud) (VMSet, error) {
var err error
ss := &scaleSet{
Cloud: az,
availabilitySet: newAvailabilitySet(az),
}
ss.nodeNameToScaleSetMappingCache, err = ss.newNodeNameToScaleSetMappingCache()
if err != nil {
return nil, err
}
ss.availabilitySetNodesCache, err = ss.newAvailabilitySetNodesCache()
if err != nil {
return nil, err
}
ss.vmssCache, err = ss.newVmssCache()
if err != nil {
return nil, err
}
ss.vmssVMCache, err = ss.newVmssVMCache()
if err != nil {
return nil, err
}
return ss, nil
}
// getVmssVM gets virtualMachineScaleSetVM by nodeName from cache.
// It returns cloudprovider.InstanceNotFound if node does not belong to any scale sets.
func (ss *scaleSet) getVmssVM(nodeName string) (ssName, instanceID string, vm compute.VirtualMachineScaleSetVM, err error) {
instanceID, err = getScaleSetVMInstanceID(nodeName)
if err != nil {
return ssName, instanceID, vm, err
}
ssName, err = ss.getScaleSetNameByNodeName(nodeName)
if err != nil {
return ssName, instanceID, vm, err
}
if ssName == "" {
return "", "", vm, cloudprovider.InstanceNotFound
}
resourceGroup, err := ss.GetNodeResourceGroup(nodeName)
if err != nil {
return "", "", vm, err
}
klog.V(4).Infof("getVmssVM gets scaleSetName (%q) and instanceID (%q) for node %q", ssName, instanceID, nodeName)
key := buildVmssCacheKey(resourceGroup, ss.makeVmssVMName(ssName, instanceID))
cachedVM, err := ss.vmssVMCache.Get(key)
if err != nil {
return ssName, instanceID, vm, err
}
if cachedVM == nil {
klog.Errorf("Can't find node (%q) in any scale sets", nodeName)
return ssName, instanceID, vm, cloudprovider.InstanceNotFound
}
return ssName, instanceID, *(cachedVM.(*compute.VirtualMachineScaleSetVM)), nil
}
// GetPowerStatusByNodeName returns the power state of the specified node.
func (ss *scaleSet) GetPowerStatusByNodeName(name string) (powerState string, err error) {
_, _, vm, err := ss.getVmssVM(name)
if err != nil {
return powerState, err
}
if vm.InstanceView != nil && vm.InstanceView.Statuses != nil {
statuses := *vm.InstanceView.Statuses
for _, status := range statuses {
state := to.String(status.Code)
if strings.HasPrefix(state, vmPowerStatePrefix) {
return strings.TrimPrefix(state, vmPowerStatePrefix), nil
}
}
}
return "", fmt.Errorf("failed to get power status for node %q", name)
}
// getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache.
// The node must belong to one of scale sets.
func (ss *scaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string) (vm compute.VirtualMachineScaleSetVM, err error) {
vmName := ss.makeVmssVMName(scaleSetName, instanceID)
key := buildVmssCacheKey(resourceGroup, vmName)
cachedVM, err := ss.vmssVMCache.Get(key)
if err != nil {
return vm, err
}
if cachedVM == nil {
klog.Errorf("couldn't find vmss virtual machine by scaleSetName (%s) and instanceID (%s)", scaleSetName, instanceID)
return vm, cloudprovider.InstanceNotFound
}
return *(cachedVM.(*compute.VirtualMachineScaleSetVM)), nil
}
// GetInstanceIDByNodeName gets the cloud provider ID by node name.
// It must return ("", cloudprovider.InstanceNotFound) if the instance does
// not exist or is no longer running.
func (ss *scaleSet) GetInstanceIDByNodeName(name string) (string, error) {
managedByAS, err := ss.isNodeManagedByAvailabilitySet(name)
if err != nil {
klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err)
return "", err
}
if managedByAS {
// vm is managed by availability set.
return ss.availabilitySet.GetInstanceIDByNodeName(name)
}
_, _, vm, err := ss.getVmssVM(name)
if err != nil {
return "", err
}
return *vm.ID, nil
}
// GetNodeNameByProviderID gets the node name by provider ID.
func (ss *scaleSet) GetNodeNameByProviderID(providerID string) (types.NodeName, error) {
// NodeName is not part of providerID for vmss instances.
scaleSetName, err := extractScaleSetNameByProviderID(providerID)
if err != nil {
klog.V(4).Infof("Can not extract scale set name from providerID (%s), assuming it is mananaged by availability set: %v", providerID, err)
return ss.availabilitySet.GetNodeNameByProviderID(providerID)
}
resourceGroup, err := extractResourceGroupByProviderID(providerID)
if err != nil {
return "", fmt.Errorf("error of extracting resource group for node %q", providerID)
}
instanceID, err := getLastSegment(providerID)
if err != nil {
klog.V(4).Infof("Can not extract instanceID from providerID (%s), assuming it is mananaged by availability set: %v", providerID, err)
return ss.availabilitySet.GetNodeNameByProviderID(providerID)
}
vm, err := ss.getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID)
if err != nil {
return "", err
}
if vm.OsProfile != nil && vm.OsProfile.ComputerName != nil {
nodeName := strings.ToLower(*vm.OsProfile.ComputerName)
return types.NodeName(nodeName), nil
}
return "", nil
}
// GetInstanceTypeByNodeName gets the instance type by node name.
func (ss *scaleSet) GetInstanceTypeByNodeName(name string) (string, error) {
managedByAS, err := ss.isNodeManagedByAvailabilitySet(name)
if err != nil {
klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err)
return "", err
}
if managedByAS {
// vm is managed by availability set.
return ss.availabilitySet.GetInstanceTypeByNodeName(name)
}
_, _, vm, err := ss.getVmssVM(name)
if err != nil {
return "", err
}
if vm.Sku != nil && vm.Sku.Name != nil {
return *vm.Sku.Name, nil
}
return "", nil
}
// GetZoneByNodeName gets availability zone for the specified node. If the node is not running
// with availability zone, then it returns fault domain.
func (ss *scaleSet) GetZoneByNodeName(name string) (cloudprovider.Zone, error) {
managedByAS, err := ss.isNodeManagedByAvailabilitySet(name)
if err != nil {
klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err)
return cloudprovider.Zone{}, err
}
if managedByAS {
// vm is managed by availability set.
return ss.availabilitySet.GetZoneByNodeName(name)
}
_, _, vm, err := ss.getVmssVM(name)
if err != nil {
return cloudprovider.Zone{}, err
}
var failureDomain string
if vm.Zones != nil && len(*vm.Zones) > 0 {
// Get availability zone for the node.
zones := *vm.Zones
zoneID, err := strconv.Atoi(zones[0])
if err != nil {
return cloudprovider.Zone{}, fmt.Errorf("failed to parse zone %q: %v", zones, err)
}
failureDomain = ss.makeZone(zoneID)
} else if vm.InstanceView != nil && vm.InstanceView.PlatformFaultDomain != nil {
// Availability zone is not used for the node, falling back to fault domain.
failureDomain = strconv.Itoa(int(*vm.InstanceView.PlatformFaultDomain))
}
return cloudprovider.Zone{
FailureDomain: failureDomain,
Region: *vm.Location,
}, nil
}
// GetPrimaryVMSetName returns the VM set name depending on the configured vmType.
// It returns config.PrimaryScaleSetName for vmss and config.PrimaryAvailabilitySetName for standard vmType.
func (ss *scaleSet) GetPrimaryVMSetName() string {
return ss.Config.PrimaryScaleSetName
}
// GetIPByNodeName gets machine private IP and public IP by node name.
func (ss *scaleSet) GetIPByNodeName(nodeName string) (string, string, error) {
nic, err := ss.GetPrimaryInterface(nodeName)
if err != nil {
klog.Errorf("error: ss.GetIPByNodeName(%s), GetPrimaryInterface(%q), err=%v", nodeName, nodeName, err)
return "", "", err
}
ipConfig, err := getPrimaryIPConfig(nic)
if err != nil {
klog.Errorf("error: ss.GetIPByNodeName(%s), getPrimaryIPConfig(%v), err=%v", nodeName, nic, err)
return "", "", err
}
internalIP := *ipConfig.PrivateIPAddress
publicIP := ""
if ipConfig.PublicIPAddress != nil && ipConfig.PublicIPAddress.ID != nil {
pipID := *ipConfig.PublicIPAddress.ID
pipName, err := getLastSegment(pipID)
if err != nil {
return "", "", fmt.Errorf("failed to get publicIP name for node %q with pipID %q", nodeName, pipID)
}
resourceGroup, err := ss.GetNodeResourceGroup(nodeName)
if err != nil {
return "", "", err
}
pip, existsPip, err := ss.getPublicIPAddress(resourceGroup, pipName)
if err != nil {
return "", "", err
}
if existsPip {
publicIP = *pip.IPAddress
}
}
return internalIP, publicIP, nil
}
// This returns the full identifier of the primary NIC for the given VM.
func (ss *scaleSet) getPrimaryInterfaceID(machine compute.VirtualMachineScaleSetVM) (string, error) {
if len(*machine.NetworkProfile.NetworkInterfaces) == 1 {
return *(*machine.NetworkProfile.NetworkInterfaces)[0].ID, nil
}
for _, ref := range *machine.NetworkProfile.NetworkInterfaces {
if *ref.Primary {
return *ref.ID, nil
}
}
return "", fmt.Errorf("failed to find a primary nic for the vm. vmname=%q", *machine.Name)
}
// machineName is composed of computerNamePrefix and 36-based instanceID.
// And instanceID part if in fixed length of 6 characters.
// Refer https://msftstack.wordpress.com/2017/05/10/figuring-out-azure-vm-scale-set-machine-names/.
func getScaleSetVMInstanceID(machineName string) (string, error) {
nameLength := len(machineName)
if nameLength < 6 {
return "", ErrorNotVmssInstance
}
instanceID, err := strconv.ParseUint(machineName[nameLength-6:], 36, 64)
if err != nil {
return "", ErrorNotVmssInstance
}
return fmt.Sprintf("%d", instanceID), nil
}
// extractScaleSetNameByProviderID extracts the scaleset name by vmss node's ProviderID.
func extractScaleSetNameByProviderID(providerID string) (string, error) {
matches := scaleSetNameRE.FindStringSubmatch(providerID)
if len(matches) != 2 {
return "", ErrorNotVmssInstance
}
return matches[1], nil
}
// extractResourceGroupByProviderID extracts the resource group name by vmss node's ProviderID.
func extractResourceGroupByProviderID(providerID string) (string, error) {
matches := resourceGroupRE.FindStringSubmatch(providerID)
if len(matches) != 2 {
return "", ErrorNotVmssInstance
}
return matches[1], nil
}
// listScaleSets lists all scale sets.
func (ss *scaleSet) listScaleSets(resourceGroup string) ([]string, error) {
var err error
ctx, cancel := getContextWithCancel()
defer cancel()
allScaleSets, err := ss.VirtualMachineScaleSetsClient.List(ctx, resourceGroup)
if err != nil {
klog.Errorf("VirtualMachineScaleSetsClient.List failed: %v", err)
return nil, err
}
ssNames := make([]string, len(allScaleSets))
for i := range allScaleSets {
ssNames[i] = *(allScaleSets[i].Name)
}
return ssNames, nil
}
// listScaleSetVMs lists VMs belonging to the specified scale set.
func (ss *scaleSet) listScaleSetVMs(scaleSetName, resourceGroup string) ([]compute.VirtualMachineScaleSetVM, error) {
var err error
ctx, cancel := getContextWithCancel()
defer cancel()
allVMs, err := ss.VirtualMachineScaleSetVMsClient.List(ctx, resourceGroup, scaleSetName, "", "", string(compute.InstanceView))
if err != nil {
klog.Errorf("VirtualMachineScaleSetVMsClient.List failed: %v", err)
return nil, err
}
return allVMs, nil
}
// getAgentPoolScaleSets lists the virtual machines for the resource group and then builds
// a list of scale sets that match the nodes available to k8s.
func (ss *scaleSet) getAgentPoolScaleSets(nodes []*v1.Node) (*[]string, error) {
agentPoolScaleSets := &[]string{}
for nx := range nodes {
if isMasterNode(nodes[nx]) {
continue
}
if ss.ShouldNodeExcludedFromLoadBalancer(nodes[nx]) {
continue
}
nodeName := nodes[nx].Name
ssName, err := ss.getScaleSetNameByNodeName(nodeName)
if err != nil {
return nil, err
}
if ssName == "" {
klog.V(3).Infof("Node %q is not belonging to any known scale sets", nodeName)
continue
}
*agentPoolScaleSets = append(*agentPoolScaleSets, ssName)
}
return agentPoolScaleSets, nil
}
// GetVMSetNames selects all possible availability sets or scale sets
// (depending vmType configured) for service load balancer. If the service has
// no loadbalancer mode annotation returns the primary VMSet. If service annotation
// for loadbalancer exists then return the eligible VMSet.
func (ss *scaleSet) GetVMSetNames(service *v1.Service, nodes []*v1.Node) (vmSetNames *[]string, err error) {
hasMode, isAuto, serviceVMSetNames := getServiceLoadBalancerMode(service)
if !hasMode {
// no mode specified in service annotation default to PrimaryScaleSetName.
scaleSetNames := &[]string{ss.Config.PrimaryScaleSetName}
return scaleSetNames, nil
}
scaleSetNames, err := ss.getAgentPoolScaleSets(nodes)
if err != nil {
klog.Errorf("ss.GetVMSetNames - getAgentPoolScaleSets failed err=(%v)", err)
return nil, err
}
if len(*scaleSetNames) == 0 {
klog.Errorf("ss.GetVMSetNames - No scale sets found for nodes in the cluster, node count(%d)", len(nodes))
return nil, fmt.Errorf("No scale sets found for nodes, node count(%d)", len(nodes))
}
// sort the list to have deterministic selection
sort.Strings(*scaleSetNames)
if !isAuto {
if serviceVMSetNames == nil || len(serviceVMSetNames) == 0 {
return nil, fmt.Errorf("service annotation for LoadBalancerMode is empty, it should have __auto__ or availability sets value")
}
// validate scale set exists
var found bool
for sasx := range serviceVMSetNames {
for asx := range *scaleSetNames {
if strings.EqualFold((*scaleSetNames)[asx], serviceVMSetNames[sasx]) {
found = true
serviceVMSetNames[sasx] = (*scaleSetNames)[asx]
break
}
}
if !found {
klog.Errorf("ss.GetVMSetNames - scale set (%s) in service annotation not found", serviceVMSetNames[sasx])
return nil, fmt.Errorf("scale set (%s) - not found", serviceVMSetNames[sasx])
}
}
vmSetNames = &serviceVMSetNames
}
return vmSetNames, nil
}
// extractResourceGroupByVMSSNicID extracts the resource group name by vmss nicID.
func extractResourceGroupByVMSSNicID(nicID string) (string, error) {
matches := vmssNicResourceGroupRE.FindStringSubmatch(nicID)
if len(matches) != 2 {
return "", fmt.Errorf("error of extracting resourceGroup from nicID %q", nicID)
}
return matches[1], nil
}
// GetPrimaryInterface gets machine primary network interface by node name and vmSet.
func (ss *scaleSet) GetPrimaryInterface(nodeName string) (network.Interface, error) {
managedByAS, err := ss.isNodeManagedByAvailabilitySet(nodeName)
if err != nil {
klog.Errorf("Failed to check isNodeManagedByAvailabilitySet: %v", err)
return network.Interface{}, err
}
if managedByAS {
// vm is managed by availability set.
return ss.availabilitySet.GetPrimaryInterface(nodeName)
}
ssName, instanceID, vm, err := ss.getVmssVM(nodeName)
if err != nil {
// VM is availability set, but not cached yet in availabilitySetNodesCache.
if err == ErrorNotVmssInstance {
return ss.availabilitySet.GetPrimaryInterface(nodeName)
}
klog.Errorf("error: ss.GetPrimaryInterface(%s), ss.getVmssVM(%s), err=%v", nodeName, nodeName, err)
return network.Interface{}, err
}
primaryInterfaceID, err := ss.getPrimaryInterfaceID(vm)
if err != nil {
klog.Errorf("error: ss.GetPrimaryInterface(%s), ss.getPrimaryInterfaceID(), err=%v", nodeName, err)
return network.Interface{}, err
}
nicName, err := getLastSegment(primaryInterfaceID)
if err != nil {
klog.Errorf("error: ss.GetPrimaryInterface(%s), getLastSegment(%s), err=%v", nodeName, primaryInterfaceID, err)
return network.Interface{}, err
}
resourceGroup, err := extractResourceGroupByVMSSNicID(primaryInterfaceID)
if err != nil {
return network.Interface{}, err
}
ctx, cancel := getContextWithCancel()
defer cancel()
nic, err := ss.InterfacesClient.GetVirtualMachineScaleSetNetworkInterface(ctx, resourceGroup, ssName, instanceID, nicName, "")
if err != nil {
klog.Errorf("error: ss.GetPrimaryInterface(%s), ss.GetVirtualMachineScaleSetNetworkInterface.Get(%s, %s, %s), err=%v", nodeName, resourceGroup, ssName, nicName, err)
return network.Interface{}, err
}
// Fix interface's location, which is required when updating the interface.
// TODO: is this a bug of azure SDK?
if nic.Location == nil || *nic.Location == "" {
nic.Location = vm.Location
}
return nic, nil
}
// getScaleSetWithRetry gets scale set with exponential backoff retry
func (ss *scaleSet) getScaleSetWithRetry(service *v1.Service, name string) (compute.VirtualMachineScaleSet, bool, error) {
var result compute.VirtualMachineScaleSet
var exists bool
err := wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
cached, retryErr := ss.vmssCache.Get(name)
if retryErr != nil {
ss.Event(service, v1.EventTypeWarning, "GetVirtualMachineScaleSet", retryErr.Error())
klog.Errorf("backoff: failure for scale set %q, will retry,err=%v", name, retryErr)
return false, nil
}
klog.V(4).Infof("backoff: success for scale set %q", name)
if cached != nil {
exists = true
result = *(cached.(*compute.VirtualMachineScaleSet))
}
return true, nil
})
return result, exists, err
}
// getPrimaryNetworkConfiguration gets primary network interface configuration for scale sets.
func (ss *scaleSet) getPrimaryNetworkConfiguration(networkConfigurationList *[]compute.VirtualMachineScaleSetNetworkConfiguration, scaleSetName string) (*compute.VirtualMachineScaleSetNetworkConfiguration, error) {
networkConfigurations := *networkConfigurationList
if len(networkConfigurations) == 1 {
return &networkConfigurations[0], nil
}
for idx := range networkConfigurations {
networkConfig := &networkConfigurations[idx]
if networkConfig.Primary != nil && *networkConfig.Primary == true {
return networkConfig, nil
}
}
return nil, fmt.Errorf("failed to find a primary network configuration for the scale set %q", scaleSetName)
}
func (ss *scaleSet) getPrimaryIPConfigForScaleSet(config *compute.VirtualMachineScaleSetNetworkConfiguration, scaleSetName string) (*compute.VirtualMachineScaleSetIPConfiguration, error) {
ipConfigurations := *config.IPConfigurations
if len(ipConfigurations) == 1 {
return &ipConfigurations[0], nil
}
for idx := range ipConfigurations {
ipConfig := &ipConfigurations[idx]
if ipConfig.Primary != nil && *ipConfig.Primary == true {
return ipConfig, nil
}
}
return nil, fmt.Errorf("failed to find a primary IP configuration for the scale set %q", scaleSetName)
}
// createOrUpdateVMSSWithRetry invokes ss.VirtualMachineScaleSetsClient.CreateOrUpdate with exponential backoff retry.
func (ss *scaleSet) createOrUpdateVMSSWithRetry(service *v1.Service, virtualMachineScaleSet compute.VirtualMachineScaleSet) error {
return wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, *virtualMachineScaleSet.Name, virtualMachineScaleSet)
klog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%s): end", *virtualMachineScaleSet.Name)
return ss.processHTTPRetryResponse(service, "CreateOrUpdateVMSS", resp, err)
})
}
// updateVMSSInstancesWithRetry invokes ss.VirtualMachineScaleSetsClient.UpdateInstances with exponential backoff retry.
func (ss *scaleSet) updateVMSSInstancesWithRetry(service *v1.Service, scaleSetName string, vmInstanceIDs compute.VirtualMachineScaleSetVMInstanceRequiredIDs) error {
return wait.ExponentialBackoff(ss.requestBackoff(), func() (bool, error) {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := ss.VirtualMachineScaleSetsClient.UpdateInstances(ctx, ss.ResourceGroup, scaleSetName, vmInstanceIDs)
klog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%s): end", scaleSetName)
return ss.processHTTPRetryResponse(service, "CreateOrUpdateVMSSInstance", resp, err)
})
}
// getNodesScaleSets returns scalesets with instanceIDs and standard node names for given nodes.
func (ss *scaleSet) getNodesScaleSets(nodes []*v1.Node) (map[string]sets.String, []*v1.Node, error) {
scalesets := make(map[string]sets.String)
standardNodes := []*v1.Node{}
for _, curNode := range nodes {
if ss.useStandardLoadBalancer() && ss.excludeMasterNodesFromStandardLB() && isMasterNode(curNode) {
klog.V(4).Infof("Excluding master node %q from load balancer backendpool", curNode.Name)
continue
}
if ss.ShouldNodeExcludedFromLoadBalancer(curNode) {
klog.V(4).Infof("Excluding unmanaged/external-resource-group node %q", curNode.Name)
continue
}
curScaleSetName, err := extractScaleSetNameByProviderID(curNode.Spec.ProviderID)
if err != nil {
klog.V(4).Infof("Node %q is not belonging to any scale sets, assuming it is belong to availability sets", curNode.Name)
standardNodes = append(standardNodes, curNode)
continue
}
if _, ok := scalesets[curScaleSetName]; !ok {
scalesets[curScaleSetName] = sets.NewString()
}
instanceID, err := getLastSegment(curNode.Spec.ProviderID)
if err != nil {
klog.Errorf("Failed to get instance ID for node %q: %v", curNode.Spec.ProviderID, err)
return nil, nil, err
}
scalesets[curScaleSetName].Insert(instanceID)
}
return scalesets, standardNodes, nil
}
// ensureHostsInVMSetPool ensures the given Node's primary IP configurations are
// participating in the vmSet's LoadBalancer Backend Pool.
func (ss *scaleSet) ensureHostsInVMSetPool(service *v1.Service, backendPoolID string, vmSetName string, instanceIDs []string, isInternal bool) error {
klog.V(3).Infof("ensuring hosts %q of scaleset %q in LB backendpool %q", instanceIDs, vmSetName, backendPoolID)
serviceName := getServiceName(service)
virtualMachineScaleSet, exists, err := ss.getScaleSetWithRetry(service, vmSetName)
if err != nil {
klog.Errorf("ss.getScaleSetWithRetry(%s) for service %q failed: %v", vmSetName, serviceName, err)
return err
}
if !exists {
errorMessage := fmt.Errorf("Scale set %q not found", vmSetName)
klog.Errorf("%v", errorMessage)
return errorMessage
}
// Find primary network interface configuration.
networkConfigureList := virtualMachineScaleSet.VirtualMachineProfile.NetworkProfile.NetworkInterfaceConfigurations
primaryNetworkConfiguration, err := ss.getPrimaryNetworkConfiguration(networkConfigureList, vmSetName)
if err != nil {
return err
}
// Find primary IP configuration.
primaryIPConfiguration, err := ss.getPrimaryIPConfigForScaleSet(primaryNetworkConfiguration, vmSetName)
if err != nil {
return err
}
// Update primary IP configuration's LoadBalancerBackendAddressPools.
foundPool := false
newBackendPools := []compute.SubResource{}
if primaryIPConfiguration.LoadBalancerBackendAddressPools != nil {
newBackendPools = *primaryIPConfiguration.LoadBalancerBackendAddressPools
}
for _, existingPool := range newBackendPools {
if strings.EqualFold(backendPoolID, *existingPool.ID) {
foundPool = true
break
}
}
if !foundPool {
if ss.useStandardLoadBalancer() && len(newBackendPools) > 0 {
// Although standard load balancer supports backends from multiple vmss,
// the same network interface couldn't be added to more than one load balancer of
// the same type. Omit those nodes (e.g. masters) so Azure ARM won't complain
// about this.
for _, pool := range newBackendPools {
backendPool := *pool.ID
matches := backendPoolIDRE.FindStringSubmatch(backendPool)
if len(matches) == 2 {
lbName := matches[1]
if strings.HasSuffix(lbName, InternalLoadBalancerNameSuffix) == isInternal {
klog.V(4).Infof("vmss %q has already been added to LB %q, omit adding it to a new one", vmSetName, lbName)
return nil
}
}
}
}
newBackendPools = append(newBackendPools,
compute.SubResource{
ID: to.StringPtr(backendPoolID),
})
primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools
ctx, cancel := getContextWithCancel()
defer cancel()
klog.V(3).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate for service (%s): scale set (%s) - updating", serviceName, vmSetName)
resp, err := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, vmSetName, virtualMachineScaleSet)
klog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): end", vmSetName)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
klog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate for service (%s): scale set (%s) - updating, err=%v", serviceName, vmSetName, err)
retryErr := ss.createOrUpdateVMSSWithRetry(service, virtualMachineScaleSet)
if retryErr != nil {
err = retryErr
klog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate for service (%s) abort backoff: scale set (%s) - updating", serviceName, vmSetName)
}
}
if err != nil {
return err
}
}
// Update instances to latest VMSS model.
vmInstanceIDs := compute.VirtualMachineScaleSetVMInstanceRequiredIDs{
InstanceIds: &instanceIDs,
}
ctx, cancel := getContextWithCancel()
defer cancel()
instanceResp, err := ss.VirtualMachineScaleSetsClient.UpdateInstances(ctx, ss.ResourceGroup, vmSetName, vmInstanceIDs)
klog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): end", vmSetName)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(instanceResp, err) {
klog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances for service (%s): scale set (%s) - updating, err=%v", serviceName, vmSetName, err)
retryErr := ss.updateVMSSInstancesWithRetry(service, vmSetName, vmInstanceIDs)
if retryErr != nil {
err = retryErr
klog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances for service (%s) abort backoff: scale set (%s) - updating", serviceName, vmSetName)
}
}
if err != nil {
return err
}
return nil
}
// EnsureHostsInPool ensures the given Node's primary IP configurations are
// participating in the specified LoadBalancer Backend Pool.
func (ss *scaleSet) EnsureHostsInPool(service *v1.Service, nodes []*v1.Node, backendPoolID string, vmSetName string, isInternal bool) error {
serviceName := getServiceName(service)
scalesets, standardNodes, err := ss.getNodesScaleSets(nodes)
if err != nil {
klog.Errorf("getNodesScaleSets() for service %q failed: %v", serviceName, err)
return err
}
for ssName, instanceIDs := range scalesets {
// Only add nodes belonging to specified vmSet for basic SKU LB.
if !ss.useStandardLoadBalancer() && !strings.EqualFold(ssName, vmSetName) {
continue
}
if instanceIDs.Len() == 0 {
// This may happen when scaling a vmss capacity to 0.
klog.V(3).Infof("scale set %q has 0 nodes, adding it to load balancer anyway", ssName)
// InstanceIDs is required to update vmss, use * instead here since there are no nodes actually.
instanceIDs.Insert("*")
}
err := ss.ensureHostsInVMSetPool(service, backendPoolID, ssName, instanceIDs.List(), isInternal)
if err != nil {
klog.Errorf("ensureHostsInVMSetPool() with scaleSet %q for service %q failed: %v", ssName, serviceName, err)
return err
}
}
if ss.useStandardLoadBalancer() && len(standardNodes) > 0 {
err := ss.availabilitySet.EnsureHostsInPool(service, standardNodes, backendPoolID, "", isInternal)
if err != nil {
klog.Errorf("availabilitySet.EnsureHostsInPool() for service %q failed: %v", serviceName, err)
return err
}
}
return nil
}
// ensureScaleSetBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified scaleset.
func (ss *scaleSet) ensureScaleSetBackendPoolDeleted(service *v1.Service, poolID, ssName string) error {
klog.V(3).Infof("ensuring backend pool %q deleted from scaleset %q", poolID, ssName)
virtualMachineScaleSet, exists, err := ss.getScaleSetWithRetry(service, ssName)
if err != nil {
klog.Errorf("ss.ensureScaleSetBackendPoolDeleted(%s, %s) getScaleSetWithRetry(%s) failed: %v", poolID, ssName, ssName, err)
return err
}
if !exists {
klog.V(2).Infof("ss.ensureScaleSetBackendPoolDeleted(%s, %s), scale set %s has already been non-exist", poolID, ssName, ssName)
return nil
}
// Find primary network interface configuration.
networkConfigureList := virtualMachineScaleSet.VirtualMachineProfile.NetworkProfile.NetworkInterfaceConfigurations
primaryNetworkConfiguration, err := ss.getPrimaryNetworkConfiguration(networkConfigureList, ssName)
if err != nil {
return err
}
// Find primary IP configuration.
primaryIPConfiguration, err := ss.getPrimaryIPConfigForScaleSet(primaryNetworkConfiguration, ssName)
if err != nil {
return err
}
// Construct new loadBalancerBackendAddressPools and remove backendAddressPools from primary IP configuration.
if primaryIPConfiguration.LoadBalancerBackendAddressPools == nil || len(*primaryIPConfiguration.LoadBalancerBackendAddressPools) == 0 {
return nil
}
existingBackendPools := *primaryIPConfiguration.LoadBalancerBackendAddressPools
newBackendPools := []compute.SubResource{}
foundPool := false
for i := len(existingBackendPools) - 1; i >= 0; i-- {
curPool := existingBackendPools[i]
if strings.EqualFold(poolID, *curPool.ID) {
klog.V(10).Infof("ensureScaleSetBackendPoolDeleted gets unwanted backend pool %q for scale set %q", poolID, ssName)
foundPool = true
newBackendPools = append(existingBackendPools[:i], existingBackendPools[i+1:]...)
}
}
if !foundPool {
// Pool not found, assume it has been already removed.
return nil
}
// Update scale set with backoff.
primaryIPConfiguration.LoadBalancerBackendAddressPools = &newBackendPools
klog.V(3).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating", ssName)
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := ss.VirtualMachineScaleSetsClient.CreateOrUpdate(ctx, ss.ResourceGroup, ssName, virtualMachineScaleSet)
klog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): end", ssName)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
klog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating, err=%v", ssName, err)
retryErr := ss.createOrUpdateVMSSWithRetry(service, virtualMachineScaleSet)
if retryErr != nil {
err = retryErr
klog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate abort backoff: scale set (%s) - updating", ssName)
}
}
if err != nil {
return err
}
// Update instances to latest VMSS model.
instanceIDs := []string{"*"}
vmInstanceIDs := compute.VirtualMachineScaleSetVMInstanceRequiredIDs{
InstanceIds: &instanceIDs,
}
instanceCtx, instanceCancel := getContextWithCancel()
defer instanceCancel()
instanceResp, err := ss.VirtualMachineScaleSetsClient.UpdateInstances(instanceCtx, ss.ResourceGroup, ssName, vmInstanceIDs)
klog.V(10).Infof("VirtualMachineScaleSetsClient.UpdateInstances(%q): end", ssName)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(instanceResp, err) {
klog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances scale set (%s) - updating, err=%v", ssName, err)
retryErr := ss.updateVMSSInstancesWithRetry(service, ssName, vmInstanceIDs)
if retryErr != nil {
err = retryErr
klog.V(2).Infof("VirtualMachineScaleSetsClient.UpdateInstances abort backoff: scale set (%s) - updating", ssName)
}
}
if err != nil {
return err
}
// Update virtualMachineScaleSet again. This is a workaround for removing VMSS reference from LB.
// TODO: remove this workaround when figuring out the root cause.
if len(newBackendPools) == 0 {
updateCtx, updateCancel := getContextWithCancel()
defer updateCancel()
klog.V(3).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating second time", ssName)
resp, err = ss.VirtualMachineScaleSetsClient.CreateOrUpdate(updateCtx, ss.ResourceGroup, ssName, virtualMachineScaleSet)
klog.V(10).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate(%q): end", ssName)
if ss.CloudProviderBackoff && shouldRetryHTTPRequest(resp, err) {
klog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate: scale set (%s) - updating, err=%v", ssName, err)
retryErr := ss.createOrUpdateVMSSWithRetry(service, virtualMachineScaleSet)
if retryErr != nil {
klog.V(2).Infof("VirtualMachineScaleSetsClient.CreateOrUpdate abort backoff: scale set (%s) - updating", ssName)
}
}
}
return nil
}
// EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified vmSet.
func (ss *scaleSet) EnsureBackendPoolDeleted(service *v1.Service, poolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) error {
if backendAddressPools == nil {
return nil
}
scalesets := sets.NewString()
for _, backendPool := range *backendAddressPools {
if strings.EqualFold(*backendPool.ID, poolID) && backendPool.BackendIPConfigurations != nil {
for _, ipConfigurations := range *backendPool.BackendIPConfigurations {
if ipConfigurations.ID == nil {
continue
}
ssName, err := extractScaleSetNameByProviderID(*ipConfigurations.ID)
if err != nil {
klog.V(4).Infof("backend IP configuration %q is not belonging to any vmss, omit it", *ipConfigurations.ID)
continue
}
scalesets.Insert(ssName)
}
break
}
}
for ssName := range scalesets {
// Only remove nodes belonging to specified vmSet to basic LB backends.
if !ss.useStandardLoadBalancer() && !strings.EqualFold(ssName, vmSetName) {
continue
}
err := ss.ensureScaleSetBackendPoolDeleted(service, poolID, ssName)
if err != nil {
klog.Errorf("ensureScaleSetBackendPoolDeleted() with scaleSet %q failed: %v", ssName, err)
return err
}
}
return nil
}
// getVmssMachineID returns the full identifier of a vmss virtual machine.
func (az *Cloud) getVmssMachineID(resourceGroup, scaleSetName, instanceID string) string {
return fmt.Sprintf(
vmssMachineIDTemplate,
az.SubscriptionID,
resourceGroup,
scaleSetName,
instanceID)
}