blob: 77bfdd20de266f9edacd3aaba6104b0b2139e55b [file] [log] [blame]
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package azure
import (
"context"
"fmt"
"math"
"reflect"
"strconv"
"strings"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
cloudprovider "k8s.io/cloud-provider"
serviceapi "k8s.io/kubernetes/pkg/api/v1/service"
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2017-09-01/network"
"github.com/Azure/go-autorest/autorest/to"
"k8s.io/klog"
)
const (
// ServiceAnnotationLoadBalancerInternal is the annotation used on the service
ServiceAnnotationLoadBalancerInternal = "service.beta.kubernetes.io/azure-load-balancer-internal"
// ServiceAnnotationLoadBalancerInternalSubnet is the annotation used on the service
// to specify what subnet it is exposed on
ServiceAnnotationLoadBalancerInternalSubnet = "service.beta.kubernetes.io/azure-load-balancer-internal-subnet"
// ServiceAnnotationLoadBalancerMode is the annotation used on the service to specify the
// Azure load balancer selection based on availability sets
// There are currently three possible load balancer selection modes :
// 1. Default mode - service has no annotation ("service.beta.kubernetes.io/azure-load-balancer-mode")
// In this case the Loadbalancer of the primary Availability set is selected
// 2. "__auto__" mode - service is annotated with __auto__ value, this when loadbalancer from any availability set
// is selected which has the minimum rules associated with it.
// 3. "as1,as2" mode - this is when the load balancer from the specified availability sets is selected that has the
// minimum rules associated with it.
ServiceAnnotationLoadBalancerMode = "service.beta.kubernetes.io/azure-load-balancer-mode"
// ServiceAnnotationLoadBalancerAutoModeValue is the annotation used on the service to specify the
// Azure load balancer auto selection from the availability sets
ServiceAnnotationLoadBalancerAutoModeValue = "__auto__"
// ServiceAnnotationDNSLabelName is the annotation used on the service
// to specify the DNS label name for the service.
ServiceAnnotationDNSLabelName = "service.beta.kubernetes.io/azure-dns-label-name"
// ServiceAnnotationSharedSecurityRule is the annotation used on the service
// to specify that the service should be exposed using an Azure security rule
// that may be shared with other service, trading specificity of rules for an
// increase in the number of services that can be exposed. This relies on the
// Azure "augmented security rules" feature.
ServiceAnnotationSharedSecurityRule = "service.beta.kubernetes.io/azure-shared-securityrule"
// ServiceAnnotationLoadBalancerResourceGroup is the annotation used on the service
// to specify the resource group of load balancer objects that are not in the same resource group as the cluster.
ServiceAnnotationLoadBalancerResourceGroup = "service.beta.kubernetes.io/azure-load-balancer-resource-group"
// ServiceAnnotationAllowedServiceTag is the annotation used on the service
// to specify a list of allowed service tags separated by comma
ServiceAnnotationAllowedServiceTag = "service.beta.kubernetes.io/azure-allowed-service-tags"
// ServiceAnnotationLoadBalancerIdleTimeout is the annotation used on the service
// to specify the idle timeout for connections on the load balancer in minutes.
ServiceAnnotationLoadBalancerIdleTimeout = "service.beta.kubernetes.io/azure-load-balancer-tcp-idle-timeout"
// ServiceAnnotationLoadBalancerMixedProtocols is the annotation used on the service
// to create both TCP and UDP protocols when creating load balancer rules.
ServiceAnnotationLoadBalancerMixedProtocols = "service.beta.kubernetes.io/azure-load-balancer-mixed-protocols"
)
var (
// supportedServiceTags holds a list of supported service tags on Azure.
// Refer https://docs.microsoft.com/en-us/azure/virtual-network/security-overview#service-tags for more information.
supportedServiceTags = sets.NewString("VirtualNetwork", "VIRTUAL_NETWORK", "AzureLoadBalancer", "AZURE_LOADBALANCER",
"Internet", "INTERNET", "AzureTrafficManager", "Storage", "Sql")
)
// GetLoadBalancer returns whether the specified load balancer exists, and
// if so, what its status is.
func (az *Cloud) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error) {
_, status, exists, err = az.getServiceLoadBalancer(service, clusterName, nil, false)
if err != nil {
return nil, false, err
}
if !exists {
serviceName := getServiceName(service)
klog.V(5).Infof("getloadbalancer (cluster:%s) (service:%s) - doesn't exist", clusterName, serviceName)
return nil, false, nil
}
return status, true, nil
}
func getPublicIPDomainNameLabel(service *v1.Service) string {
if labelName, found := service.Annotations[ServiceAnnotationDNSLabelName]; found {
return labelName
}
return ""
}
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer
func (az *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
// When a client updates the internal load balancer annotation,
// the service may be switched from an internal LB to a public one, or vise versa.
// Here we'll firstly ensure service do not lie in the opposite LB.
serviceName := getServiceName(service)
klog.V(5).Infof("ensureloadbalancer(%s): START clusterName=%q", serviceName, clusterName)
lb, err := az.reconcileLoadBalancer(clusterName, service, nodes, true /* wantLb */)
if err != nil {
return nil, err
}
lbStatus, err := az.getServiceLoadBalancerStatus(service, lb)
if err != nil {
return nil, err
}
var serviceIP *string
if lbStatus != nil && len(lbStatus.Ingress) > 0 {
serviceIP = &lbStatus.Ingress[0].IP
}
klog.V(2).Infof("EnsureLoadBalancer: reconciling security group for service %q with IP %q, wantLb = true", serviceName, logSafe(serviceIP))
if _, err := az.reconcileSecurityGroup(clusterName, service, serviceIP, true /* wantLb */); err != nil {
return nil, err
}
updateService := updateServiceLoadBalancerIP(service, to.String(serviceIP))
flippedService := flipServiceInternalAnnotation(updateService)
if _, err := az.reconcileLoadBalancer(clusterName, flippedService, nil, false /* wantLb */); err != nil {
return nil, err
}
if _, err := az.reconcilePublicIP(clusterName, updateService, lb, true /* wantLb */); err != nil {
return nil, err
}
return lbStatus, nil
}
// UpdateLoadBalancer updates hosts under the specified load balancer.
func (az *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
_, err := az.EnsureLoadBalancer(ctx, clusterName, service, nodes)
return err
}
// EnsureLoadBalancerDeleted deletes the specified load balancer if it
// exists, returning nil if the load balancer specified either didn't exist or
// was successfully deleted.
// This construction is useful because many cloud providers' load balancers
// have multiple underlying components, meaning a Get could say that the LB
// doesn't exist even if some part of it is still laying around.
func (az *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error {
isInternal := requiresInternalLoadBalancer(service)
serviceName := getServiceName(service)
klog.V(5).Infof("delete(%s): START clusterName=%q", serviceName, clusterName)
serviceIPToCleanup, err := az.findServiceIPAddress(ctx, clusterName, service, isInternal)
if err != nil {
return err
}
klog.V(2).Infof("EnsureLoadBalancerDeleted: reconciling security group for service %q with IP %q, wantLb = false", serviceName, serviceIPToCleanup)
if _, err := az.reconcileSecurityGroup(clusterName, service, &serviceIPToCleanup, false /* wantLb */); err != nil {
return err
}
if _, err := az.reconcileLoadBalancer(clusterName, service, nil, false /* wantLb */); err != nil {
return err
}
if _, err := az.reconcilePublicIP(clusterName, service, nil, false /* wantLb */); err != nil {
return err
}
klog.V(2).Infof("delete(%s): FINISH", serviceName)
return nil
}
// GetLoadBalancerName returns the LoadBalancer name.
func (az *Cloud) GetLoadBalancerName(ctx context.Context, clusterName string, service *v1.Service) string {
// TODO: replace DefaultLoadBalancerName to generate more meaningful loadbalancer names.
return cloudprovider.DefaultLoadBalancerName(service)
}
// getServiceLoadBalancer gets the loadbalancer for the service if it already exists.
// If wantLb is TRUE then -it selects a new load balancer.
// In case the selected load balancer does not exist it returns network.LoadBalancer struct
// with added metadata (such as name, location) and existsLB set to FALSE.
// By default - cluster default LB is returned.
func (az *Cloud) getServiceLoadBalancer(service *v1.Service, clusterName string, nodes []*v1.Node, wantLb bool) (lb *network.LoadBalancer, status *v1.LoadBalancerStatus, exists bool, err error) {
isInternal := requiresInternalLoadBalancer(service)
var defaultLB *network.LoadBalancer
primaryVMSetName := az.vmSet.GetPrimaryVMSetName()
defaultLBName := az.getAzureLoadBalancerName(clusterName, primaryVMSetName, isInternal)
existingLBs, err := az.ListLBWithRetry(service)
if err != nil {
return nil, nil, false, err
}
// check if the service already has a load balancer
if existingLBs != nil {
for i := range existingLBs {
existingLB := existingLBs[i]
if strings.EqualFold(*existingLB.Name, defaultLBName) {
defaultLB = &existingLB
}
if isInternalLoadBalancer(&existingLB) != isInternal {
continue
}
status, err = az.getServiceLoadBalancerStatus(service, &existingLB)
if err != nil {
return nil, nil, false, err
}
if status == nil {
// service is not on this load balancer
continue
}
return &existingLB, status, true, nil
}
}
hasMode, _, _ := getServiceLoadBalancerMode(service)
if az.useStandardLoadBalancer() && hasMode {
return nil, nil, false, fmt.Errorf("standard load balancer doesn't work with annotation %q", ServiceAnnotationLoadBalancerMode)
}
// service does not have a basic load balancer, select one.
// Standard load balancer doesn't need this because all backends nodes should be added to same LB.
if wantLb && !az.useStandardLoadBalancer() {
// select new load balancer for service
selectedLB, exists, err := az.selectLoadBalancer(clusterName, service, &existingLBs, nodes)
if err != nil {
return nil, nil, false, err
}
return selectedLB, nil, exists, err
}
// create a default LB with meta data if not present
if defaultLB == nil {
defaultLB = &network.LoadBalancer{
Name: &defaultLBName,
Location: &az.Location,
LoadBalancerPropertiesFormat: &network.LoadBalancerPropertiesFormat{},
}
if az.useStandardLoadBalancer() {
defaultLB.Sku = &network.LoadBalancerSku{
Name: network.LoadBalancerSkuNameStandard,
}
}
}
return defaultLB, nil, false, nil
}
// selectLoadBalancer selects load balancer for the service in the cluster.
// The selection algorithm selects the load balancer which currently has
// the minimum lb rules. If there are multiple LBs with same number of rules,
// then selects the first one (sorted based on name).
func (az *Cloud) selectLoadBalancer(clusterName string, service *v1.Service, existingLBs *[]network.LoadBalancer, nodes []*v1.Node) (selectedLB *network.LoadBalancer, existsLb bool, err error) {
isInternal := requiresInternalLoadBalancer(service)
serviceName := getServiceName(service)
klog.V(2).Infof("selectLoadBalancer for service (%s): isInternal(%v) - start", serviceName, isInternal)
vmSetNames, err := az.vmSet.GetVMSetNames(service, nodes)
if err != nil {
klog.Errorf("az.selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - az.GetVMSetNames failed, err=(%v)", clusterName, serviceName, isInternal, err)
return nil, false, err
}
klog.Infof("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - vmSetNames %v", clusterName, serviceName, isInternal, *vmSetNames)
mapExistingLBs := map[string]network.LoadBalancer{}
for _, lb := range *existingLBs {
mapExistingLBs[*lb.Name] = lb
}
selectedLBRuleCount := math.MaxInt32
for _, currASName := range *vmSetNames {
currLBName := az.getAzureLoadBalancerName(clusterName, currASName, isInternal)
lb, exists := mapExistingLBs[currLBName]
if !exists {
// select this LB as this is a new LB and will have minimum rules
// create tmp lb struct to hold metadata for the new load-balancer
selectedLB = &network.LoadBalancer{
Name: &currLBName,
Location: &az.Location,
LoadBalancerPropertiesFormat: &network.LoadBalancerPropertiesFormat{},
}
return selectedLB, false, nil
}
lbRules := *lb.LoadBalancingRules
currLBRuleCount := 0
if lbRules != nil {
currLBRuleCount = len(lbRules)
}
if currLBRuleCount < selectedLBRuleCount {
selectedLBRuleCount = currLBRuleCount
selectedLB = &lb
}
}
if selectedLB == nil {
err = fmt.Errorf("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - unable to find load balancer for selected VM sets %v", clusterName, serviceName, isInternal, *vmSetNames)
klog.Error(err)
return nil, false, err
}
// validate if the selected LB has not exceeded the MaximumLoadBalancerRuleCount
if az.Config.MaximumLoadBalancerRuleCount != 0 && selectedLBRuleCount >= az.Config.MaximumLoadBalancerRuleCount {
err = fmt.Errorf("selectLoadBalancer: cluster(%s) service(%s) isInternal(%t) - all available load balancers have exceeded maximum rule limit %d, vmSetNames (%v)", clusterName, serviceName, isInternal, selectedLBRuleCount, *vmSetNames)
klog.Error(err)
return selectedLB, existsLb, err
}
return selectedLB, existsLb, nil
}
func (az *Cloud) getServiceLoadBalancerStatus(service *v1.Service, lb *network.LoadBalancer) (status *v1.LoadBalancerStatus, err error) {
if lb == nil {
klog.V(10).Info("getServiceLoadBalancerStatus: lb is nil")
return nil, nil
}
if lb.FrontendIPConfigurations == nil || *lb.FrontendIPConfigurations == nil {
klog.V(10).Info("getServiceLoadBalancerStatus: lb.FrontendIPConfigurations is nil")
return nil, nil
}
isInternal := requiresInternalLoadBalancer(service)
lbFrontendIPConfigName := az.getFrontendIPConfigName(service, subnet(service))
serviceName := getServiceName(service)
for _, ipConfiguration := range *lb.FrontendIPConfigurations {
if lbFrontendIPConfigName == *ipConfiguration.Name {
var lbIP *string
if isInternal {
lbIP = ipConfiguration.PrivateIPAddress
} else {
if ipConfiguration.PublicIPAddress == nil {
return nil, fmt.Errorf("get(%s): lb(%s) - failed to get LB PublicIPAddress is Nil", serviceName, *lb.Name)
}
pipID := ipConfiguration.PublicIPAddress.ID
if pipID == nil {
return nil, fmt.Errorf("get(%s): lb(%s) - failed to get LB PublicIPAddress ID is Nil", serviceName, *lb.Name)
}
pipName, err := getLastSegment(*pipID)
if err != nil {
return nil, fmt.Errorf("get(%s): lb(%s) - failed to get LB PublicIPAddress Name from ID(%s)", serviceName, *lb.Name, *pipID)
}
pip, existsPip, err := az.getPublicIPAddress(az.getPublicIPAddressResourceGroup(service), pipName)
if err != nil {
return nil, err
}
if existsPip {
lbIP = pip.IPAddress
}
}
klog.V(2).Infof("getServiceLoadBalancerStatus gets ingress IP %q from frontendIPConfiguration %q for service %q", to.String(lbIP), lbFrontendIPConfigName, serviceName)
return &v1.LoadBalancerStatus{Ingress: []v1.LoadBalancerIngress{{IP: to.String(lbIP)}}}, nil
}
}
return nil, nil
}
func (az *Cloud) determinePublicIPName(clusterName string, service *v1.Service) (string, error) {
loadBalancerIP := service.Spec.LoadBalancerIP
if len(loadBalancerIP) == 0 {
return az.getPublicIPName(clusterName, service), nil
}
pipResourceGroup := az.getPublicIPAddressResourceGroup(service)
pips, err := az.ListPIPWithRetry(service, pipResourceGroup)
if err != nil {
return "", err
}
for _, pip := range pips {
if pip.PublicIPAddressPropertiesFormat.IPAddress != nil &&
*pip.PublicIPAddressPropertiesFormat.IPAddress == loadBalancerIP {
return *pip.Name, nil
}
}
return "", fmt.Errorf("user supplied IP Address %s was not found in resource group %s", loadBalancerIP, pipResourceGroup)
}
func flipServiceInternalAnnotation(service *v1.Service) *v1.Service {
copyService := service.DeepCopy()
if copyService.Annotations == nil {
copyService.Annotations = map[string]string{}
}
if v, ok := copyService.Annotations[ServiceAnnotationLoadBalancerInternal]; ok && v == "true" {
// If it is internal now, we make it external by remove the annotation
delete(copyService.Annotations, ServiceAnnotationLoadBalancerInternal)
} else {
// If it is external now, we make it internal
copyService.Annotations[ServiceAnnotationLoadBalancerInternal] = "true"
}
return copyService
}
func updateServiceLoadBalancerIP(service *v1.Service, serviceIP string) *v1.Service {
copyService := service.DeepCopy()
if len(serviceIP) > 0 && copyService != nil {
copyService.Spec.LoadBalancerIP = serviceIP
}
return copyService
}
func (az *Cloud) findServiceIPAddress(ctx context.Context, clusterName string, service *v1.Service, isInternalLb bool) (string, error) {
if len(service.Spec.LoadBalancerIP) > 0 {
return service.Spec.LoadBalancerIP, nil
}
lbStatus, existsLb, err := az.GetLoadBalancer(ctx, clusterName, service)
if err != nil {
return "", err
}
if !existsLb {
klog.V(2).Infof("Expected to find an IP address for service %s but did not. Assuming it has been removed", service.Name)
return "", nil
}
if len(lbStatus.Ingress) < 1 {
klog.V(2).Infof("Expected to find an IP address for service %s but it had no ingresses. Assuming it has been removed", service.Name)
return "", nil
}
return lbStatus.Ingress[0].IP, nil
}
func (az *Cloud) ensurePublicIPExists(service *v1.Service, pipName string, domainNameLabel string) (*network.PublicIPAddress, error) {
pipResourceGroup := az.getPublicIPAddressResourceGroup(service)
pip, existsPip, err := az.getPublicIPAddress(pipResourceGroup, pipName)
if err != nil {
return nil, err
}
if existsPip {
return &pip, nil
}
serviceName := getServiceName(service)
pip.Name = to.StringPtr(pipName)
pip.Location = to.StringPtr(az.Location)
pip.PublicIPAddressPropertiesFormat = &network.PublicIPAddressPropertiesFormat{
PublicIPAllocationMethod: network.Static,
}
if len(domainNameLabel) > 0 {
pip.PublicIPAddressPropertiesFormat.DNSSettings = &network.PublicIPAddressDNSSettings{
DomainNameLabel: &domainNameLabel,
}
}
pip.Tags = map[string]*string{"service": &serviceName}
if az.useStandardLoadBalancer() {
pip.Sku = &network.PublicIPAddressSku{
Name: network.PublicIPAddressSkuNameStandard,
}
}
klog.V(2).Infof("ensurePublicIPExists for service(%s): pip(%s) - creating", serviceName, *pip.Name)
klog.V(10).Infof("CreateOrUpdatePIPWithRetry(%s, %q): start", pipResourceGroup, *pip.Name)
err = az.CreateOrUpdatePIPWithRetry(service, pipResourceGroup, pip)
if err != nil {
klog.V(2).Infof("ensure(%s) abort backoff: pip(%s) - creating", serviceName, *pip.Name)
return nil, err
}
klog.V(10).Infof("CreateOrUpdatePIPWithRetry(%s, %q): end", pipResourceGroup, *pip.Name)
ctx, cancel := getContextWithCancel()
defer cancel()
pip, err = az.PublicIPAddressesClient.Get(ctx, pipResourceGroup, *pip.Name, "")
if err != nil {
return nil, err
}
return &pip, nil
}
func getIdleTimeout(s *v1.Service) (*int32, error) {
const (
min = 4
max = 30
)
val, ok := s.Annotations[ServiceAnnotationLoadBalancerIdleTimeout]
if !ok {
// Return a nil here as this will set the value to the azure default
return nil, nil
}
errInvalidTimeout := fmt.Errorf("idle timeout value must be a whole number representing minutes between %d and %d", min, max)
to, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("error parsing idle timeout value: %v: %v", err, errInvalidTimeout)
}
to32 := int32(to)
if to32 < min || to32 > max {
return nil, errInvalidTimeout
}
return &to32, nil
}
func (az *Cloud) isFrontendIPChanged(clusterName string, config network.FrontendIPConfiguration, service *v1.Service, lbFrontendIPConfigName string) (bool, error) {
if az.serviceOwnsFrontendIP(config, service) && !strings.EqualFold(to.String(config.Name), lbFrontendIPConfigName) {
return true, nil
}
if !strings.EqualFold(to.String(config.Name), lbFrontendIPConfigName) {
return false, nil
}
loadBalancerIP := service.Spec.LoadBalancerIP
isInternal := requiresInternalLoadBalancer(service)
if isInternal {
// Judge subnet
subnetName := subnet(service)
if subnetName != nil {
subnet, existsSubnet, err := az.getSubnet(az.VnetName, *subnetName)
if err != nil {
return false, err
}
if !existsSubnet {
return false, fmt.Errorf("failed to get subnet")
}
if config.Subnet != nil && !strings.EqualFold(to.String(config.Subnet.Name), to.String(subnet.Name)) {
return true, nil
}
}
if loadBalancerIP == "" {
return config.PrivateIPAllocationMethod == network.Static, nil
}
return config.PrivateIPAllocationMethod != network.Static || !strings.EqualFold(loadBalancerIP, to.String(config.PrivateIPAddress)), nil
}
if loadBalancerIP == "" {
return false, nil
}
pipName, err := az.determinePublicIPName(clusterName, service)
if err != nil {
return false, err
}
pipResourceGroup := az.getPublicIPAddressResourceGroup(service)
pip, existsPip, err := az.getPublicIPAddress(pipResourceGroup, pipName)
if err != nil {
return false, err
}
if !existsPip {
return true, nil
}
return config.PublicIPAddress != nil && !strings.EqualFold(to.String(pip.ID), to.String(config.PublicIPAddress.ID)), nil
}
// This ensures load balancer exists and the frontend ip config is setup.
// This also reconciles the Service's Ports with the LoadBalancer config.
// This entails adding rules/probes for expected Ports and removing stale rules/ports.
// nodes only used if wantLb is true
func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node, wantLb bool) (*network.LoadBalancer, error) {
isInternal := requiresInternalLoadBalancer(service)
serviceName := getServiceName(service)
klog.V(2).Infof("reconcileLoadBalancer for service(%s) - wantLb(%t): started", serviceName, wantLb)
lb, _, _, err := az.getServiceLoadBalancer(service, clusterName, nodes, wantLb)
if err != nil {
klog.Errorf("reconcileLoadBalancer: failed to get load balancer for service %q, error: %v", serviceName, err)
return nil, err
}
lbName := *lb.Name
klog.V(2).Infof("reconcileLoadBalancer for service(%s): lb(%s) wantLb(%t) resolved load balancer name", serviceName, lbName, wantLb)
lbFrontendIPConfigName := az.getFrontendIPConfigName(service, subnet(service))
lbFrontendIPConfigID := az.getFrontendIPConfigID(lbName, lbFrontendIPConfigName)
lbBackendPoolName := getBackendPoolName(clusterName)
lbBackendPoolID := az.getBackendPoolID(lbName, lbBackendPoolName)
lbIdleTimeout, err := getIdleTimeout(service)
if err != nil {
return nil, err
}
dirtyLb := false
// Ensure LoadBalancer's Backend Pool Configuration
if wantLb {
newBackendPools := []network.BackendAddressPool{}
if lb.BackendAddressPools != nil {
newBackendPools = *lb.BackendAddressPools
}
foundBackendPool := false
for _, bp := range newBackendPools {
if strings.EqualFold(*bp.Name, lbBackendPoolName) {
klog.V(10).Infof("reconcileLoadBalancer for service (%s)(%t): lb backendpool - found wanted backendpool. not adding anything", serviceName, wantLb)
foundBackendPool = true
break
} else {
klog.V(10).Infof("reconcileLoadBalancer for service (%s)(%t): lb backendpool - found other backendpool %s", serviceName, wantLb, *bp.Name)
}
}
if !foundBackendPool {
newBackendPools = append(newBackendPools, network.BackendAddressPool{
Name: to.StringPtr(lbBackendPoolName),
})
klog.V(10).Infof("reconcileLoadBalancer for service (%s)(%t): lb backendpool - adding backendpool", serviceName, wantLb)
dirtyLb = true
lb.BackendAddressPools = &newBackendPools
}
}
// Ensure LoadBalancer's Frontend IP Configurations
dirtyConfigs := false
newConfigs := []network.FrontendIPConfiguration{}
if lb.FrontendIPConfigurations != nil {
newConfigs = *lb.FrontendIPConfigurations
}
if !wantLb {
for i := len(newConfigs) - 1; i >= 0; i-- {
config := newConfigs[i]
if az.serviceOwnsFrontendIP(config, service) {
klog.V(2).Infof("reconcileLoadBalancer for service (%s)(%t): lb frontendconfig(%s) - dropping", serviceName, wantLb, lbFrontendIPConfigName)
newConfigs = append(newConfigs[:i], newConfigs[i+1:]...)
dirtyConfigs = true
}
}
} else {
for i := len(newConfigs) - 1; i >= 0; i-- {
config := newConfigs[i]
isFipChanged, err := az.isFrontendIPChanged(clusterName, config, service, lbFrontendIPConfigName)
if err != nil {
return nil, err
}
if isFipChanged {
klog.V(2).Infof("reconcileLoadBalancer for service (%s)(%t): lb frontendconfig(%s) - dropping", serviceName, wantLb, *config.Name)
newConfigs = append(newConfigs[:i], newConfigs[i+1:]...)
dirtyConfigs = true
}
}
foundConfig := false
for _, config := range newConfigs {
if strings.EqualFold(*config.Name, lbFrontendIPConfigName) {
foundConfig = true
break
}
}
if !foundConfig {
// construct FrontendIPConfigurationPropertiesFormat
var fipConfigurationProperties *network.FrontendIPConfigurationPropertiesFormat
if isInternal {
subnetName := subnet(service)
if subnetName == nil {
subnetName = &az.SubnetName
}
subnet, existsSubnet, err := az.getSubnet(az.VnetName, *subnetName)
if err != nil {
return nil, err
}
if !existsSubnet {
return nil, fmt.Errorf("ensure(%s): lb(%s) - failed to get subnet: %s/%s", serviceName, lbName, az.VnetName, az.SubnetName)
}
configProperties := network.FrontendIPConfigurationPropertiesFormat{
Subnet: &subnet,
}
loadBalancerIP := service.Spec.LoadBalancerIP
if loadBalancerIP != "" {
configProperties.PrivateIPAllocationMethod = network.Static
configProperties.PrivateIPAddress = &loadBalancerIP
} else {
// We'll need to call GetLoadBalancer later to retrieve allocated IP.
configProperties.PrivateIPAllocationMethod = network.Dynamic
}
fipConfigurationProperties = &configProperties
} else {
pipName, err := az.determinePublicIPName(clusterName, service)
if err != nil {
return nil, err
}
domainNameLabel := getPublicIPDomainNameLabel(service)
pip, err := az.ensurePublicIPExists(service, pipName, domainNameLabel)
if err != nil {
return nil, err
}
fipConfigurationProperties = &network.FrontendIPConfigurationPropertiesFormat{
PublicIPAddress: &network.PublicIPAddress{ID: pip.ID},
}
}
newConfigs = append(newConfigs,
network.FrontendIPConfiguration{
Name: to.StringPtr(lbFrontendIPConfigName),
FrontendIPConfigurationPropertiesFormat: fipConfigurationProperties,
})
klog.V(10).Infof("reconcileLoadBalancer for service (%s)(%t): lb frontendconfig(%s) - adding", serviceName, wantLb, lbFrontendIPConfigName)
dirtyConfigs = true
}
}
if dirtyConfigs {
dirtyLb = true
lb.FrontendIPConfigurations = &newConfigs
}
// update probes/rules
expectedProbes, expectedRules, err := az.reconcileLoadBalancerRule(service, wantLb, lbFrontendIPConfigID, lbBackendPoolID, lbName, lbIdleTimeout)
// remove unwanted probes
dirtyProbes := false
var updatedProbes []network.Probe
if lb.Probes != nil {
updatedProbes = *lb.Probes
}
for i := len(updatedProbes) - 1; i >= 0; i-- {
existingProbe := updatedProbes[i]
if az.serviceOwnsRule(service, *existingProbe.Name) {
klog.V(10).Infof("reconcileLoadBalancer for service (%s)(%t): lb probe(%s) - considering evicting", serviceName, wantLb, *existingProbe.Name)
keepProbe := false
if findProbe(expectedProbes, existingProbe) {
klog.V(10).Infof("reconcileLoadBalancer for service (%s)(%t): lb probe(%s) - keeping", serviceName, wantLb, *existingProbe.Name)
keepProbe = true
}
if !keepProbe {
updatedProbes = append(updatedProbes[:i], updatedProbes[i+1:]...)
klog.V(10).Infof("reconcileLoadBalancer for service (%s)(%t): lb probe(%s) - dropping", serviceName, wantLb, *existingProbe.Name)
dirtyProbes = true
}
}
}
// add missing, wanted probes
for _, expectedProbe := range expectedProbes {
foundProbe := false
if findProbe(updatedProbes, expectedProbe) {
klog.V(10).Infof("reconcileLoadBalancer for service (%s)(%t): lb probe(%s) - already exists", serviceName, wantLb, *expectedProbe.Name)
foundProbe = true
}
if !foundProbe {
klog.V(10).Infof("reconcileLoadBalancer for service (%s)(%t): lb probe(%s) - adding", serviceName, wantLb, *expectedProbe.Name)
updatedProbes = append(updatedProbes, expectedProbe)
dirtyProbes = true
}
}
if dirtyProbes {
dirtyLb = true
lb.Probes = &updatedProbes
}
// update rules
dirtyRules := false
var updatedRules []network.LoadBalancingRule
if lb.LoadBalancingRules != nil {
updatedRules = *lb.LoadBalancingRules
}
// update rules: remove unwanted
for i := len(updatedRules) - 1; i >= 0; i-- {
existingRule := updatedRules[i]
if az.serviceOwnsRule(service, *existingRule.Name) {
keepRule := false
klog.V(10).Infof("reconcileLoadBalancer for service (%s)(%t): lb rule(%s) - considering evicting", serviceName, wantLb, *existingRule.Name)
if findRule(expectedRules, existingRule) {
klog.V(10).Infof("reconcileLoadBalancer for service (%s)(%t): lb rule(%s) - keeping", serviceName, wantLb, *existingRule.Name)
keepRule = true
}
if !keepRule {
klog.V(2).Infof("reconcileLoadBalancer for service (%s)(%t): lb rule(%s) - dropping", serviceName, wantLb, *existingRule.Name)
updatedRules = append(updatedRules[:i], updatedRules[i+1:]...)
dirtyRules = true
}
}
}
// update rules: add needed
for _, expectedRule := range expectedRules {
foundRule := false
if findRule(updatedRules, expectedRule) {
klog.V(10).Infof("reconcileLoadBalancer for service (%s)(%t): lb rule(%s) - already exists", serviceName, wantLb, *expectedRule.Name)
foundRule = true
}
if !foundRule {
klog.V(10).Infof("reconcileLoadBalancer for service (%s)(%t): lb rule(%s) adding", serviceName, wantLb, *expectedRule.Name)
updatedRules = append(updatedRules, expectedRule)
dirtyRules = true
}
}
if dirtyRules {
dirtyLb = true
lb.LoadBalancingRules = &updatedRules
}
// We don't care if the LB exists or not
// We only care about if there is any change in the LB, which means dirtyLB
// If it is not exist, and no change to that, we don't CreateOrUpdate LB
if dirtyLb {
if lb.FrontendIPConfigurations == nil || len(*lb.FrontendIPConfigurations) == 0 {
// When FrontendIPConfigurations is empty, we need to delete the Azure load balancer resource itself,
// because an Azure load balancer cannot have an empty FrontendIPConfigurations collection
klog.V(2).Infof("reconcileLoadBalancer for service(%s): lb(%s) - deleting; no remaining frontendIPConfigurations", serviceName, lbName)
// Remove backend pools from vmSets. This is required for virtual machine scale sets before removing the LB.
vmSetName := az.mapLoadBalancerNameToVMSet(lbName, clusterName)
klog.V(10).Infof("EnsureBackendPoolDeleted(%s, %s): start", lbBackendPoolID, vmSetName)
err := az.vmSet.EnsureBackendPoolDeleted(service, lbBackendPoolID, vmSetName, lb.BackendAddressPools)
if err != nil {
klog.Errorf("EnsureBackendPoolDeleted(%s, %s) failed: %v", lbBackendPoolID, vmSetName, err)
return nil, err
}
klog.V(10).Infof("EnsureBackendPoolDeleted(%s, %s): end", lbBackendPoolID, vmSetName)
// Remove the LB.
klog.V(10).Infof("reconcileLoadBalancer: az.DeleteLBWithRetry(%q): start", lbName)
err = az.DeleteLBWithRetry(service, lbName)
if err != nil {
klog.V(2).Infof("reconcileLoadBalancer for service(%s) abort backoff: lb(%s) - deleting; no remaining frontendIPConfigurations", serviceName, lbName)
return nil, err
}
klog.V(10).Infof("az.DeleteLBWithRetry(%q): end", lbName)
} else {
klog.V(2).Infof("reconcileLoadBalancer: reconcileLoadBalancer for service(%s): lb(%s) - updating", serviceName, lbName)
err := az.CreateOrUpdateLBWithRetry(service, *lb)
if err != nil {
klog.V(2).Infof("reconcileLoadBalancer for service(%s) abort backoff: lb(%s) - updating", serviceName, lbName)
return nil, err
}
if isInternal {
// Refresh updated lb which will be used later in other places.
newLB, exist, err := az.getAzureLoadBalancer(lbName)
if err != nil {
klog.V(2).Infof("reconcileLoadBalancer for service(%s): getAzureLoadBalancer(%s) failed: %v", serviceName, lbName, err)
return nil, err
}
if !exist {
return nil, fmt.Errorf("load balancer %q not found", lbName)
}
lb = &newLB
}
}
}
if wantLb && nodes != nil {
// Add the machines to the backend pool if they're not already
vmSetName := az.mapLoadBalancerNameToVMSet(lbName, clusterName)
err := az.vmSet.EnsureHostsInPool(service, nodes, lbBackendPoolID, vmSetName, isInternal)
if err != nil {
return nil, err
}
}
klog.V(2).Infof("reconcileLoadBalancer for service(%s): lb(%s) finished", serviceName, lbName)
return lb, nil
}
func (az *Cloud) reconcileLoadBalancerRule(
service *v1.Service,
wantLb bool,
lbFrontendIPConfigID string,
lbBackendPoolID string,
lbName string,
lbIdleTimeout *int32) ([]network.Probe, []network.LoadBalancingRule, error) {
var ports []v1.ServicePort
if wantLb {
ports = service.Spec.Ports
} else {
ports = []v1.ServicePort{}
}
var expectedProbes []network.Probe
var expectedRules []network.LoadBalancingRule
for _, port := range ports {
lbRuleName := az.getLoadBalancerRuleName(service, port, subnet(service))
klog.V(2).Infof("reconcileLoadBalancerRule lb name (%s) rule name (%s)", lbName, lbRuleName)
transportProto, _, probeProto, err := getProtocolsFromKubernetesProtocol(port.Protocol)
if err != nil {
return expectedProbes, expectedRules, err
}
if serviceapi.NeedsHealthCheck(service) {
podPresencePath, podPresencePort := serviceapi.GetServiceHealthCheckPathPort(service)
expectedProbes = append(expectedProbes, network.Probe{
Name: &lbRuleName,
ProbePropertiesFormat: &network.ProbePropertiesFormat{
RequestPath: to.StringPtr(podPresencePath),
Protocol: network.ProbeProtocolHTTP,
Port: to.Int32Ptr(podPresencePort),
IntervalInSeconds: to.Int32Ptr(5),
NumberOfProbes: to.Int32Ptr(2),
},
})
} else if port.Protocol != v1.ProtocolUDP && port.Protocol != v1.ProtocolSCTP {
// we only add the expected probe if we're doing TCP
expectedProbes = append(expectedProbes, network.Probe{
Name: &lbRuleName,
ProbePropertiesFormat: &network.ProbePropertiesFormat{
Protocol: *probeProto,
Port: to.Int32Ptr(port.NodePort),
IntervalInSeconds: to.Int32Ptr(5),
NumberOfProbes: to.Int32Ptr(2),
},
})
}
loadDistribution := network.Default
if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP {
loadDistribution = network.SourceIP
}
expectedRule := network.LoadBalancingRule{
Name: &lbRuleName,
LoadBalancingRulePropertiesFormat: &network.LoadBalancingRulePropertiesFormat{
Protocol: *transportProto,
FrontendIPConfiguration: &network.SubResource{
ID: to.StringPtr(lbFrontendIPConfigID),
},
BackendAddressPool: &network.SubResource{
ID: to.StringPtr(lbBackendPoolID),
},
LoadDistribution: loadDistribution,
FrontendPort: to.Int32Ptr(port.Port),
BackendPort: to.Int32Ptr(port.Port),
EnableFloatingIP: to.BoolPtr(true),
},
}
if port.Protocol == v1.ProtocolTCP {
expectedRule.LoadBalancingRulePropertiesFormat.IdleTimeoutInMinutes = lbIdleTimeout
}
// we didn't construct the probe objects for UDP or SCTP because they're not used/needed/allowed
if port.Protocol != v1.ProtocolUDP && port.Protocol != v1.ProtocolSCTP {
expectedRule.Probe = &network.SubResource{
ID: to.StringPtr(az.getLoadBalancerProbeID(lbName, lbRuleName)),
}
}
expectedRules = append(expectedRules, expectedRule)
}
return expectedProbes, expectedRules, nil
}
// This reconciles the Network Security Group similar to how the LB is reconciled.
// This entails adding required, missing SecurityRules and removing stale rules.
func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service, lbIP *string, wantLb bool) (*network.SecurityGroup, error) {
serviceName := getServiceName(service)
klog.V(5).Infof("reconcileSecurityGroup(%s): START clusterName=%q", serviceName, clusterName)
ports := service.Spec.Ports
if ports == nil {
if useSharedSecurityRule(service) {
klog.V(2).Infof("Attempting to reconcile security group for service %s, but service uses shared rule and we don't know which port it's for", service.Name)
return nil, fmt.Errorf("No port info for reconciling shared rule for service %s", service.Name)
}
ports = []v1.ServicePort{}
}
sg, err := az.getSecurityGroup()
if err != nil {
return nil, err
}
destinationIPAddress := ""
if wantLb && lbIP == nil {
return nil, fmt.Errorf("No load balancer IP for setting up security rules for service %s", service.Name)
}
if lbIP != nil {
destinationIPAddress = *lbIP
}
if destinationIPAddress == "" {
destinationIPAddress = "*"
}
sourceRanges, err := serviceapi.GetLoadBalancerSourceRanges(service)
if err != nil {
return nil, err
}
serviceTags, err := getServiceTags(service)
if err != nil {
return nil, err
}
var sourceAddressPrefixes []string
if (sourceRanges == nil || serviceapi.IsAllowAll(sourceRanges)) && len(serviceTags) == 0 {
if !requiresInternalLoadBalancer(service) {
sourceAddressPrefixes = []string{"Internet"}
}
} else {
for _, ip := range sourceRanges {
sourceAddressPrefixes = append(sourceAddressPrefixes, ip.String())
}
for _, serviceTag := range serviceTags {
sourceAddressPrefixes = append(sourceAddressPrefixes, serviceTag)
}
}
expectedSecurityRules := []network.SecurityRule{}
if wantLb {
expectedSecurityRules = make([]network.SecurityRule, len(ports)*len(sourceAddressPrefixes))
for i, port := range ports {
_, securityProto, _, err := getProtocolsFromKubernetesProtocol(port.Protocol)
if err != nil {
return nil, err
}
for j := range sourceAddressPrefixes {
ix := i*len(sourceAddressPrefixes) + j
securityRuleName := az.getSecurityRuleName(service, port, sourceAddressPrefixes[j])
expectedSecurityRules[ix] = network.SecurityRule{
Name: to.StringPtr(securityRuleName),
SecurityRulePropertiesFormat: &network.SecurityRulePropertiesFormat{
Protocol: *securityProto,
SourcePortRange: to.StringPtr("*"),
DestinationPortRange: to.StringPtr(strconv.Itoa(int(port.Port))),
SourceAddressPrefix: to.StringPtr(sourceAddressPrefixes[j]),
DestinationAddressPrefix: to.StringPtr(destinationIPAddress),
Access: network.SecurityRuleAccessAllow,
Direction: network.SecurityRuleDirectionInbound,
},
}
}
}
}
for _, r := range expectedSecurityRules {
klog.V(10).Infof("Expecting security rule for %s: %s:%s -> %s:%s", service.Name, *r.SourceAddressPrefix, *r.SourcePortRange, *r.DestinationAddressPrefix, *r.DestinationPortRange)
}
// update security rules
dirtySg := false
var updatedRules []network.SecurityRule
if sg.SecurityGroupPropertiesFormat != nil && sg.SecurityGroupPropertiesFormat.SecurityRules != nil {
updatedRules = *sg.SecurityGroupPropertiesFormat.SecurityRules
}
for _, r := range updatedRules {
klog.V(10).Infof("Existing security rule while processing %s: %s:%s -> %s:%s", service.Name, logSafe(r.SourceAddressPrefix), logSafe(r.SourcePortRange), logSafeCollection(r.DestinationAddressPrefix, r.DestinationAddressPrefixes), logSafe(r.DestinationPortRange))
}
// update security rules: remove unwanted rules that belong privately
// to this service
for i := len(updatedRules) - 1; i >= 0; i-- {
existingRule := updatedRules[i]
if az.serviceOwnsRule(service, *existingRule.Name) {
klog.V(10).Infof("reconcile(%s)(%t): sg rule(%s) - considering evicting", serviceName, wantLb, *existingRule.Name)
keepRule := false
if findSecurityRule(expectedSecurityRules, existingRule) {
klog.V(10).Infof("reconcile(%s)(%t): sg rule(%s) - keeping", serviceName, wantLb, *existingRule.Name)
keepRule = true
}
if !keepRule {
klog.V(10).Infof("reconcile(%s)(%t): sg rule(%s) - dropping", serviceName, wantLb, *existingRule.Name)
updatedRules = append(updatedRules[:i], updatedRules[i+1:]...)
dirtySg = true
}
}
}
// update security rules: if the service uses a shared rule and is being deleted,
// then remove it from the shared rule
if useSharedSecurityRule(service) && !wantLb {
for _, port := range ports {
for _, sourceAddressPrefix := range sourceAddressPrefixes {
sharedRuleName := az.getSecurityRuleName(service, port, sourceAddressPrefix)
sharedIndex, sharedRule, sharedRuleFound := findSecurityRuleByName(updatedRules, sharedRuleName)
if !sharedRuleFound {
klog.V(4).Infof("Expected to find shared rule %s for service %s being deleted, but did not", sharedRuleName, service.Name)
return nil, fmt.Errorf("Expected to find shared rule %s for service %s being deleted, but did not", sharedRuleName, service.Name)
}
if sharedRule.DestinationAddressPrefixes == nil {
klog.V(4).Infof("Expected to have array of destinations in shared rule for service %s being deleted, but did not", service.Name)
return nil, fmt.Errorf("Expected to have array of destinations in shared rule for service %s being deleted, but did not", service.Name)
}
existingPrefixes := *sharedRule.DestinationAddressPrefixes
addressIndex, found := findIndex(existingPrefixes, destinationIPAddress)
if !found {
klog.V(4).Infof("Expected to find destination address %s in shared rule %s for service %s being deleted, but did not", destinationIPAddress, sharedRuleName, service.Name)
return nil, fmt.Errorf("Expected to find destination address %s in shared rule %s for service %s being deleted, but did not", destinationIPAddress, sharedRuleName, service.Name)
}
if len(existingPrefixes) == 1 {
updatedRules = append(updatedRules[:sharedIndex], updatedRules[sharedIndex+1:]...)
} else {
newDestinations := append(existingPrefixes[:addressIndex], existingPrefixes[addressIndex+1:]...)
sharedRule.DestinationAddressPrefixes = &newDestinations
updatedRules[sharedIndex] = sharedRule
}
dirtySg = true
}
}
}
// update security rules: prepare rules for consolidation
for index, rule := range updatedRules {
if allowsConsolidation(rule) {
updatedRules[index] = makeConsolidatable(rule)
}
}
for index, rule := range expectedSecurityRules {
if allowsConsolidation(rule) {
expectedSecurityRules[index] = makeConsolidatable(rule)
}
}
// update security rules: add needed
for _, expectedRule := range expectedSecurityRules {
foundRule := false
if findSecurityRule(updatedRules, expectedRule) {
klog.V(10).Infof("reconcile(%s)(%t): sg rule(%s) - already exists", serviceName, wantLb, *expectedRule.Name)
foundRule = true
}
if foundRule && allowsConsolidation(expectedRule) {
index, _ := findConsolidationCandidate(updatedRules, expectedRule)
updatedRules[index] = consolidate(updatedRules[index], expectedRule)
dirtySg = true
}
if !foundRule {
klog.V(10).Infof("reconcile(%s)(%t): sg rule(%s) - adding", serviceName, wantLb, *expectedRule.Name)
nextAvailablePriority, err := getNextAvailablePriority(updatedRules)
if err != nil {
return nil, err
}
expectedRule.Priority = to.Int32Ptr(nextAvailablePriority)
updatedRules = append(updatedRules, expectedRule)
dirtySg = true
}
}
for _, r := range updatedRules {
klog.V(10).Infof("Updated security rule while processing %s: %s:%s -> %s:%s", service.Name, logSafe(r.SourceAddressPrefix), logSafe(r.SourcePortRange), logSafeCollection(r.DestinationAddressPrefix, r.DestinationAddressPrefixes), logSafe(r.DestinationPortRange))
}
if dirtySg {
sg.SecurityRules = &updatedRules
klog.V(2).Infof("reconcileSecurityGroup for service(%s): sg(%s) - updating", serviceName, *sg.Name)
klog.V(10).Infof("CreateOrUpdateSGWithRetry(%q): start", *sg.Name)
err := az.CreateOrUpdateSGWithRetry(service, sg)
if err != nil {
klog.V(2).Infof("ensure(%s) abort backoff: sg(%s) - updating", serviceName, *sg.Name)
// TODO (Nov 2017): remove when augmented security rules are out of preview
// we could try to parse the response but it's not worth it for bridging a preview
errorDescription := err.Error()
if strings.Contains(errorDescription, "SubscriptionNotRegisteredForFeature") && strings.Contains(errorDescription, "Microsoft.Network/AllowAccessRuleExtendedProperties") {
sharedRuleError := fmt.Errorf("Shared security rules are not available in this Azure region. Details: %v", errorDescription)
return nil, sharedRuleError
}
// END TODO
return nil, err
}
klog.V(10).Infof("CreateOrUpdateSGWithRetry(%q): end", *sg.Name)
}
return &sg, nil
}
func logSafe(s *string) string {
if s == nil {
return "(nil)"
}
return *s
}
func logSafeCollection(s *string, strs *[]string) string {
if s == nil {
if strs == nil {
return "(nil)"
}
return "[" + strings.Join(*strs, ",") + "]"
}
return *s
}
func findSecurityRuleByName(rules []network.SecurityRule, ruleName string) (int, network.SecurityRule, bool) {
for index, rule := range rules {
if rule.Name != nil && strings.EqualFold(*rule.Name, ruleName) {
return index, rule, true
}
}
return 0, network.SecurityRule{}, false
}
func findIndex(strs []string, s string) (int, bool) {
for index, str := range strs {
if strings.EqualFold(str, s) {
return index, true
}
}
return 0, false
}
func allowsConsolidation(rule network.SecurityRule) bool {
return strings.HasPrefix(to.String(rule.Name), "shared")
}
func findConsolidationCandidate(rules []network.SecurityRule, rule network.SecurityRule) (int, bool) {
for index, r := range rules {
if allowsConsolidation(r) {
if strings.EqualFold(to.String(r.Name), to.String(rule.Name)) {
return index, true
}
}
}
return 0, false
}
func makeConsolidatable(rule network.SecurityRule) network.SecurityRule {
return network.SecurityRule{
Name: rule.Name,
SecurityRulePropertiesFormat: &network.SecurityRulePropertiesFormat{
Priority: rule.Priority,
Protocol: rule.Protocol,
SourcePortRange: rule.SourcePortRange,
SourcePortRanges: rule.SourcePortRanges,
DestinationPortRange: rule.DestinationPortRange,
DestinationPortRanges: rule.DestinationPortRanges,
SourceAddressPrefix: rule.SourceAddressPrefix,
SourceAddressPrefixes: rule.SourceAddressPrefixes,
DestinationAddressPrefixes: collectionOrSingle(rule.DestinationAddressPrefixes, rule.DestinationAddressPrefix),
Access: rule.Access,
Direction: rule.Direction,
},
}
}
func consolidate(existingRule network.SecurityRule, newRule network.SecurityRule) network.SecurityRule {
destinations := appendElements(existingRule.SecurityRulePropertiesFormat.DestinationAddressPrefixes, newRule.DestinationAddressPrefix, newRule.DestinationAddressPrefixes)
destinations = deduplicate(destinations) // there are transient conditions during controller startup where it tries to add a service that is already added
return network.SecurityRule{
Name: existingRule.Name,
SecurityRulePropertiesFormat: &network.SecurityRulePropertiesFormat{
Priority: existingRule.Priority,
Protocol: existingRule.Protocol,
SourcePortRange: existingRule.SourcePortRange,
SourcePortRanges: existingRule.SourcePortRanges,
DestinationPortRange: existingRule.DestinationPortRange,
DestinationPortRanges: existingRule.DestinationPortRanges,
SourceAddressPrefix: existingRule.SourceAddressPrefix,
SourceAddressPrefixes: existingRule.SourceAddressPrefixes,
DestinationAddressPrefixes: destinations,
Access: existingRule.Access,
Direction: existingRule.Direction,
},
}
}
func collectionOrSingle(collection *[]string, s *string) *[]string {
if collection != nil && len(*collection) > 0 {
return collection
}
if s == nil {
return &[]string{}
}
return &[]string{*s}
}
func appendElements(collection *[]string, appendString *string, appendStrings *[]string) *[]string {
newCollection := []string{}
if collection != nil {
newCollection = append(newCollection, *collection...)
}
if appendString != nil {
newCollection = append(newCollection, *appendString)
}
if appendStrings != nil {
newCollection = append(newCollection, *appendStrings...)
}
return &newCollection
}
func deduplicate(collection *[]string) *[]string {
if collection == nil {
return nil
}
seen := map[string]bool{}
result := make([]string, 0, len(*collection))
for _, v := range *collection {
if seen[v] == true {
// skip this element
} else {
seen[v] = true
result = append(result, v)
}
}
return &result
}
// This reconciles the PublicIP resources similar to how the LB is reconciled.
func (az *Cloud) reconcilePublicIP(clusterName string, service *v1.Service, lb *network.LoadBalancer, wantLb bool) (*network.PublicIPAddress, error) {
isInternal := requiresInternalLoadBalancer(service)
serviceName := getServiceName(service)
var desiredPipName string
var err error
if !isInternal && wantLb {
desiredPipName, err = az.determinePublicIPName(clusterName, service)
if err != nil {
return nil, err
}
}
pipResourceGroup := az.getPublicIPAddressResourceGroup(service)
pips, err := az.ListPIPWithRetry(service, pipResourceGroup)
if err != nil {
return nil, err
}
for i := range pips {
pip := pips[i]
if pip.Tags != nil &&
(pip.Tags)["service"] != nil &&
*(pip.Tags)["service"] == serviceName {
// We need to process for pips belong to this service
pipName := *pip.Name
if wantLb && !isInternal && pipName == desiredPipName {
// This is the only case we should preserve the
// Public ip resource with match service tag
} else {
klog.V(2).Infof("reconcilePublicIP for service(%s): pip(%s) - deleting", serviceName, pipName)
err := az.safeDeletePublicIP(service, pipResourceGroup, &pip, lb)
if err != nil {
klog.Errorf("safeDeletePublicIP(%s) failed with error: %v", pipName, err)
return nil, err
}
klog.V(2).Infof("reconcilePublicIP for service(%s): pip(%s) - finished", serviceName, pipName)
}
}
}
if !isInternal && wantLb {
// Confirm desired public ip resource exists
var pip *network.PublicIPAddress
domainNameLabel := getPublicIPDomainNameLabel(service)
if pip, err = az.ensurePublicIPExists(service, desiredPipName, domainNameLabel); err != nil {
return nil, err
}
return pip, nil
}
return nil, nil
}
// safeDeletePublicIP deletes public IP by removing its reference first.
func (az *Cloud) safeDeletePublicIP(service *v1.Service, pipResourceGroup string, pip *network.PublicIPAddress, lb *network.LoadBalancer) error {
// Remove references if pip.IPConfiguration is not nil.
if pip.PublicIPAddressPropertiesFormat != nil &&
pip.PublicIPAddressPropertiesFormat.IPConfiguration != nil &&
lb != nil && lb.LoadBalancerPropertiesFormat != nil &&
lb.LoadBalancerPropertiesFormat.FrontendIPConfigurations != nil {
referencedLBRules := []network.SubResource{}
frontendIPConfigUpdated := false
loadBalancerRuleUpdated := false
// Check whether there are still frontend IP configurations referring to it.
ipConfigurationID := to.String(pip.PublicIPAddressPropertiesFormat.IPConfiguration.ID)
if ipConfigurationID != "" {
lbFrontendIPConfigs := *lb.LoadBalancerPropertiesFormat.FrontendIPConfigurations
for i := len(lbFrontendIPConfigs) - 1; i >= 0; i-- {
config := lbFrontendIPConfigs[i]
if strings.EqualFold(ipConfigurationID, to.String(config.ID)) {
if config.FrontendIPConfigurationPropertiesFormat != nil &&
config.FrontendIPConfigurationPropertiesFormat.LoadBalancingRules != nil {
referencedLBRules = *config.FrontendIPConfigurationPropertiesFormat.LoadBalancingRules
}
frontendIPConfigUpdated = true
lbFrontendIPConfigs = append(lbFrontendIPConfigs[:i], lbFrontendIPConfigs[i+1:]...)
break
}
}
if frontendIPConfigUpdated {
lb.LoadBalancerPropertiesFormat.FrontendIPConfigurations = &lbFrontendIPConfigs
}
}
// Check whether there are still load balancer rules referring to it.
if len(referencedLBRules) > 0 {
referencedLBRuleIDs := sets.NewString()
for _, refer := range referencedLBRules {
referencedLBRuleIDs.Insert(to.String(refer.ID))
}
if lb.LoadBalancerPropertiesFormat.LoadBalancingRules != nil {
lbRules := *lb.LoadBalancerPropertiesFormat.LoadBalancingRules
for i := len(lbRules) - 1; i >= 0; i-- {
ruleID := to.String(lbRules[i].ID)
if ruleID != "" && referencedLBRuleIDs.Has(ruleID) {
loadBalancerRuleUpdated = true
lbRules = append(lbRules[:i], lbRules[i+1:]...)
}
}
if loadBalancerRuleUpdated {
lb.LoadBalancerPropertiesFormat.LoadBalancingRules = &lbRules
}
}
}
// Update load balancer when frontendIPConfigUpdated or loadBalancerRuleUpdated.
if frontendIPConfigUpdated || loadBalancerRuleUpdated {
err := az.CreateOrUpdateLBWithRetry(service, *lb)
if err != nil {
klog.Errorf("safeDeletePublicIP for service(%s) failed with error: %v", getServiceName(service), err)
return err
}
}
}
pipName := to.String(pip.Name)
klog.V(10).Infof("DeletePublicIPWithRetry(%s, %q): start", pipResourceGroup, pipName)
err := az.DeletePublicIPWithRetry(service, pipResourceGroup, pipName)
if err != nil {
if err = ignoreStatusNotFoundFromError(err); err != nil {
return err
}
}
klog.V(10).Infof("DeletePublicIPWithRetry(%s, %q): end", pipResourceGroup, pipName)
return nil
}
func findProbe(probes []network.Probe, probe network.Probe) bool {
for _, existingProbe := range probes {
if strings.EqualFold(to.String(existingProbe.Name), to.String(probe.Name)) && to.Int32(existingProbe.Port) == to.Int32(probe.Port) {
return true
}
}
return false
}
func findRule(rules []network.LoadBalancingRule, rule network.LoadBalancingRule) bool {
for _, existingRule := range rules {
if strings.EqualFold(to.String(existingRule.Name), to.String(rule.Name)) &&
equalLoadBalancingRulePropertiesFormat(existingRule.LoadBalancingRulePropertiesFormat, rule.LoadBalancingRulePropertiesFormat) {
return true
}
}
return false
}
// equalLoadBalancingRulePropertiesFormat checks whether the provided LoadBalancingRulePropertiesFormat are equal.
// Note: only fields used in reconcileLoadBalancer are considered.
func equalLoadBalancingRulePropertiesFormat(s, t *network.LoadBalancingRulePropertiesFormat) bool {
if s == nil || t == nil {
return false
}
return reflect.DeepEqual(s.Protocol, t.Protocol) &&
reflect.DeepEqual(s.FrontendIPConfiguration, t.FrontendIPConfiguration) &&
reflect.DeepEqual(s.BackendAddressPool, t.BackendAddressPool) &&
reflect.DeepEqual(s.LoadDistribution, t.LoadDistribution) &&
reflect.DeepEqual(s.FrontendPort, t.FrontendPort) &&
reflect.DeepEqual(s.BackendPort, t.BackendPort) &&
reflect.DeepEqual(s.EnableFloatingIP, t.EnableFloatingIP) &&
reflect.DeepEqual(s.IdleTimeoutInMinutes, t.IdleTimeoutInMinutes)
}
// This compares rule's Name, Protocol, SourcePortRange, DestinationPortRange, SourceAddressPrefix, Access, and Direction.
// Note that it compares rule's DestinationAddressPrefix only when it's not consolidated rule as such rule does not have DestinationAddressPrefix defined.
// We intentionally do not compare DestinationAddressPrefixes in consolidated case because reconcileSecurityRule has to consider the two rules equal,
// despite different DestinationAddressPrefixes, in order to give it a chance to consolidate the two rules.
func findSecurityRule(rules []network.SecurityRule, rule network.SecurityRule) bool {
for _, existingRule := range rules {
if !strings.EqualFold(to.String(existingRule.Name), to.String(rule.Name)) {
continue
}
if existingRule.Protocol != rule.Protocol {
continue
}
if !strings.EqualFold(to.String(existingRule.SourcePortRange), to.String(rule.SourcePortRange)) {
continue
}
if !strings.EqualFold(to.String(existingRule.DestinationPortRange), to.String(rule.DestinationPortRange)) {
continue
}
if !strings.EqualFold(to.String(existingRule.SourceAddressPrefix), to.String(rule.SourceAddressPrefix)) {
continue
}
if !allowsConsolidation(existingRule) && !allowsConsolidation(rule) {
if !strings.EqualFold(to.String(existingRule.DestinationAddressPrefix), to.String(rule.DestinationAddressPrefix)) {
continue
}
}
if existingRule.Access != rule.Access {
continue
}
if existingRule.Direction != rule.Direction {
continue
}
return true
}
return false
}
func (az *Cloud) getPublicIPAddressResourceGroup(service *v1.Service) string {
if resourceGroup, found := service.Annotations[ServiceAnnotationLoadBalancerResourceGroup]; found {
return resourceGroup
}
return az.ResourceGroup
}
// Check if service requires an internal load balancer.
func requiresInternalLoadBalancer(service *v1.Service) bool {
if l, found := service.Annotations[ServiceAnnotationLoadBalancerInternal]; found {
return l == "true"
}
return false
}
func subnet(service *v1.Service) *string {
if requiresInternalLoadBalancer(service) {
if l, found := service.Annotations[ServiceAnnotationLoadBalancerInternalSubnet]; found {
return &l
}
}
return nil
}
// getServiceLoadBalancerMode parses the mode value.
// if the value is __auto__ it returns isAuto = TRUE.
// if anything else it returns the unique VM set names after triming spaces.
func getServiceLoadBalancerMode(service *v1.Service) (hasMode bool, isAuto bool, vmSetNames []string) {
mode, hasMode := service.Annotations[ServiceAnnotationLoadBalancerMode]
mode = strings.TrimSpace(mode)
isAuto = strings.EqualFold(mode, ServiceAnnotationLoadBalancerAutoModeValue)
if !isAuto {
// Break up list of "AS1,AS2"
vmSetParsedList := strings.Split(mode, ",")
// Trim the VM set names and remove duplicates
// e.g. {"AS1"," AS2", "AS3", "AS3"} => {"AS1", "AS2", "AS3"}
vmSetNameSet := sets.NewString()
for _, v := range vmSetParsedList {
vmSetNameSet.Insert(strings.TrimSpace(v))
}
vmSetNames = vmSetNameSet.List()
}
return hasMode, isAuto, vmSetNames
}
func useSharedSecurityRule(service *v1.Service) bool {
if l, ok := service.Annotations[ServiceAnnotationSharedSecurityRule]; ok {
return l == "true"
}
return false
}
func getServiceTags(service *v1.Service) ([]string, error) {
if serviceTags, found := service.Annotations[ServiceAnnotationAllowedServiceTag]; found {
tags := strings.Split(strings.TrimSpace(serviceTags), ",")
for _, tag := range tags {
// Storage and Sql service tags support setting regions with suffix ".Region"
if strings.HasPrefix(tag, "Storage.") || strings.HasPrefix(tag, "Sql.") {
continue
}
if !supportedServiceTags.Has(tag) {
return nil, fmt.Errorf("only %q are allowed in service tags", supportedServiceTags.List())
}
}
return tags, nil
}
return nil, nil
}