blob: 339160f1e3d35693f6e15d79a3cbea24a3c4a276 [file] [log] [blame]
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aws
import (
"crypto/sha1"
"fmt"
"reflect"
"strconv"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/elb"
"github.com/aws/aws-sdk-go/service/elbv2"
"k8s.io/klog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
)
const (
// ProxyProtocolPolicyName is the tag named used for the proxy protocol
// policy
ProxyProtocolPolicyName = "k8s-proxyprotocol-enabled"
// SSLNegotiationPolicyNameFormat is a format string used for the SSL
// negotiation policy tag name
SSLNegotiationPolicyNameFormat = "k8s-SSLNegotiationPolicy-%s"
)
var (
// Defaults for ELB Healthcheck
defaultHCHealthyThreshold = int64(2)
defaultHCUnhealthyThreshold = int64(6)
defaultHCTimeout = int64(5)
defaultHCInterval = int64(10)
)
func isNLB(annotations map[string]string) bool {
if annotations[ServiceAnnotationLoadBalancerType] == "nlb" {
return true
}
return false
}
type nlbPortMapping struct {
FrontendPort int64
TrafficPort int64
ClientCIDR string
HealthCheckPort int64
HealthCheckPath string
HealthCheckProtocol string
}
// getLoadBalancerAdditionalTags converts the comma separated list of key-value
// pairs in the ServiceAnnotationLoadBalancerAdditionalTags annotation and returns
// it as a map.
func getLoadBalancerAdditionalTags(annotations map[string]string) map[string]string {
additionalTags := make(map[string]string)
if additionalTagsList, ok := annotations[ServiceAnnotationLoadBalancerAdditionalTags]; ok {
additionalTagsList = strings.TrimSpace(additionalTagsList)
// Break up list of "Key1=Val,Key2=Val2"
tagList := strings.Split(additionalTagsList, ",")
// Break up "Key=Val"
for _, tagSet := range tagList {
tag := strings.Split(strings.TrimSpace(tagSet), "=")
// Accept "Key=val" or "Key=" or just "Key"
if len(tag) >= 2 && len(tag[0]) != 0 {
// There is a key and a value, so save it
additionalTags[tag[0]] = tag[1]
} else if len(tag) == 1 && len(tag[0]) != 0 {
// Just "Key"
additionalTags[tag[0]] = ""
}
}
}
return additionalTags
}
// ensureLoadBalancerv2 ensures a v2 load balancer is created
func (c *Cloud) ensureLoadBalancerv2(namespacedName types.NamespacedName, loadBalancerName string, mappings []nlbPortMapping, instanceIDs, subnetIDs []string, internalELB bool, annotations map[string]string) (*elbv2.LoadBalancer, error) {
loadBalancer, err := c.describeLoadBalancerv2(loadBalancerName)
if err != nil {
return nil, err
}
dirty := false
// Get additional tags set by the user
tags := getLoadBalancerAdditionalTags(annotations)
// Add default tags
tags[TagNameKubernetesService] = namespacedName.String()
tags = c.tagging.buildTags(ResourceLifecycleOwned, tags)
if loadBalancer == nil {
// Create the LB
createRequest := &elbv2.CreateLoadBalancerInput{
Type: aws.String(elbv2.LoadBalancerTypeEnumNetwork),
Name: aws.String(loadBalancerName),
}
if internalELB {
createRequest.Scheme = aws.String("internal")
}
// We are supposed to specify one subnet per AZ.
// TODO: What happens if we have more than one subnet per AZ?
createRequest.SubnetMappings = createSubnetMappings(subnetIDs)
for k, v := range tags {
createRequest.Tags = append(createRequest.Tags, &elbv2.Tag{
Key: aws.String(k), Value: aws.String(v),
})
}
klog.Infof("Creating load balancer for %v with name: %s", namespacedName, loadBalancerName)
createResponse, err := c.elbv2.CreateLoadBalancer(createRequest)
if err != nil {
return nil, fmt.Errorf("Error creating load balancer: %q", err)
}
loadBalancer = createResponse.LoadBalancers[0]
// Create Target Groups
resourceArns := make([]*string, 0, len(mappings))
for i := range mappings {
// It is easier to keep track of updates by having possibly
// duplicate target groups where the backend port is the same
_, targetGroupArn, err := c.createListenerV2(createResponse.LoadBalancers[0].LoadBalancerArn, mappings[i], namespacedName, instanceIDs, *createResponse.LoadBalancers[0].VpcId)
if err != nil {
return nil, fmt.Errorf("Error creating listener: %q", err)
}
resourceArns = append(resourceArns, targetGroupArn)
}
// Add tags to targets
targetGroupTags := make([]*elbv2.Tag, 0, len(tags))
for k, v := range tags {
targetGroupTags = append(targetGroupTags, &elbv2.Tag{
Key: aws.String(k), Value: aws.String(v),
})
}
if len(resourceArns) > 0 && len(targetGroupTags) > 0 {
// elbv2.AddTags doesn't allow to tag multiple resources at once
for _, arn := range resourceArns {
_, err = c.elbv2.AddTags(&elbv2.AddTagsInput{
ResourceArns: []*string{arn},
Tags: targetGroupTags,
})
if err != nil {
return nil, fmt.Errorf("Error adding tags after creating Load Balancer: %q", err)
}
}
}
} else {
// TODO: Sync internal vs non-internal
// sync mappings
{
listenerDescriptions, err := c.elbv2.DescribeListeners(
&elbv2.DescribeListenersInput{
LoadBalancerArn: loadBalancer.LoadBalancerArn,
},
)
if err != nil {
return nil, fmt.Errorf("Error describing listeners: %q", err)
}
// actual maps FrontendPort to an elbv2.Listener
actual := map[int64]*elbv2.Listener{}
for _, listener := range listenerDescriptions.Listeners {
actual[*listener.Port] = listener
}
actualTargetGroups, err := c.elbv2.DescribeTargetGroups(
&elbv2.DescribeTargetGroupsInput{
LoadBalancerArn: loadBalancer.LoadBalancerArn,
},
)
if err != nil {
return nil, fmt.Errorf("Error listing target groups: %q", err)
}
nodePortTargetGroup := map[int64]*elbv2.TargetGroup{}
for _, targetGroup := range actualTargetGroups.TargetGroups {
nodePortTargetGroup[*targetGroup.Port] = targetGroup
}
// Create Target Groups
addTagsInput := &elbv2.AddTagsInput{
ResourceArns: []*string{},
Tags: []*elbv2.Tag{},
}
// Handle additions/modifications
for _, mapping := range mappings {
frontendPort := mapping.FrontendPort
nodePort := mapping.TrafficPort
// modifications
if listener, ok := actual[frontendPort]; ok {
// nodePort must have changed, we'll need to delete old TG
// and recreate
if targetGroup, ok := nodePortTargetGroup[nodePort]; !ok {
// Create new Target group
targetName := createTargetName(namespacedName, frontendPort, nodePort)
targetGroup, err = c.ensureTargetGroup(
nil,
mapping,
instanceIDs,
targetName,
*loadBalancer.VpcId,
)
if err != nil {
return nil, err
}
// Associate new target group to LB
_, err := c.elbv2.ModifyListener(&elbv2.ModifyListenerInput{
ListenerArn: listener.ListenerArn,
Port: aws.Int64(frontendPort),
Protocol: aws.String("TCP"),
DefaultActions: []*elbv2.Action{{
TargetGroupArn: targetGroup.TargetGroupArn,
Type: aws.String("forward"),
}},
})
if err != nil {
return nil, fmt.Errorf("Error updating load balancer listener: %q", err)
}
// Delete old target group
_, err = c.elbv2.DeleteTargetGroup(&elbv2.DeleteTargetGroupInput{
TargetGroupArn: listener.DefaultActions[0].TargetGroupArn,
})
if err != nil {
return nil, fmt.Errorf("Error deleting old target group: %q", err)
}
} else {
// Run ensureTargetGroup to make sure instances in service are up-to-date
targetName := createTargetName(namespacedName, frontendPort, nodePort)
_, err = c.ensureTargetGroup(
targetGroup,
mapping,
instanceIDs,
targetName,
*loadBalancer.VpcId,
)
if err != nil {
return nil, err
}
}
dirty = true
continue
}
// Additions
_, targetGroupArn, err := c.createListenerV2(loadBalancer.LoadBalancerArn, mapping, namespacedName, instanceIDs, *loadBalancer.VpcId)
if err != nil {
return nil, err
}
addTagsInput.ResourceArns = append(addTagsInput.ResourceArns, targetGroupArn)
dirty = true
}
frontEndPorts := map[int64]bool{}
for i := range mappings {
frontEndPorts[mappings[i].FrontendPort] = true
}
// handle deletions
for port, listener := range actual {
if _, ok := frontEndPorts[port]; !ok {
err := c.deleteListenerV2(listener)
if err != nil {
return nil, err
}
dirty = true
}
}
// Add tags to new targets
for k, v := range tags {
addTagsInput.Tags = append(addTagsInput.Tags, &elbv2.Tag{
Key: aws.String(k), Value: aws.String(v),
})
}
if len(addTagsInput.ResourceArns) > 0 && len(addTagsInput.Tags) > 0 {
_, err = c.elbv2.AddTags(addTagsInput)
if err != nil {
return nil, fmt.Errorf("Error adding tags after modifying load balancer targets: %q", err)
}
}
}
desiredLoadBalancerAttributes := map[string]string{}
// Default values to ensured a remove annotation reverts back to the default
desiredLoadBalancerAttributes["load_balancing.cross_zone.enabled"] = "false"
// Determine if cross zone load balancing enabled/disabled has been specified
crossZoneLoadBalancingEnabledAnnotation := annotations[ServiceAnnotationLoadBalancerCrossZoneLoadBalancingEnabled]
if crossZoneLoadBalancingEnabledAnnotation != "" {
crossZoneEnabled, err := strconv.ParseBool(crossZoneLoadBalancingEnabledAnnotation)
if err != nil {
return nil, fmt.Errorf("error parsing service annotation: %s=%s",
ServiceAnnotationLoadBalancerCrossZoneLoadBalancingEnabled,
crossZoneLoadBalancingEnabledAnnotation,
)
}
if crossZoneEnabled {
desiredLoadBalancerAttributes["load_balancing.cross_zone.enabled"] = "true"
}
}
// Whether the ELB was new or existing, sync attributes regardless. This accounts for things
// that cannot be specified at the time of creation and can only be modified after the fact,
// e.g. idle connection timeout.
describeAttributesRequest := &elbv2.DescribeLoadBalancerAttributesInput{
LoadBalancerArn: loadBalancer.LoadBalancerArn,
}
describeAttributesOutput, err := c.elbv2.DescribeLoadBalancerAttributes(describeAttributesRequest)
if err != nil {
return nil, fmt.Errorf("Unable to retrieve load balancer attributes during attribute sync: %q", err)
}
changedAttributes := []*elbv2.LoadBalancerAttribute{}
// Identify to be changed attributes
for _, foundAttribute := range describeAttributesOutput.Attributes {
if targetValue, ok := desiredLoadBalancerAttributes[*foundAttribute.Key]; ok {
if targetValue != *foundAttribute.Value {
changedAttributes = append(changedAttributes, &elbv2.LoadBalancerAttribute{
Key: foundAttribute.Key,
Value: aws.String(targetValue),
})
}
}
}
// Update attributes requiring changes
if len(changedAttributes) > 0 {
klog.V(2).Infof("Updating load-balancer attributes for %q", loadBalancerName)
_, err = c.elbv2.ModifyLoadBalancerAttributes(&elbv2.ModifyLoadBalancerAttributesInput{
LoadBalancerArn: loadBalancer.LoadBalancerArn,
Attributes: changedAttributes,
})
if err != nil {
return nil, fmt.Errorf("Unable to update load balancer attributes during attribute sync: %q", err)
}
}
// Subnets cannot be modified on NLBs
if dirty {
loadBalancers, err := c.elbv2.DescribeLoadBalancers(
&elbv2.DescribeLoadBalancersInput{
LoadBalancerArns: []*string{
loadBalancer.LoadBalancerArn,
},
},
)
if err != nil {
return nil, fmt.Errorf("Error retrieving load balancer after update: %q", err)
}
loadBalancer = loadBalancers.LoadBalancers[0]
}
}
return loadBalancer, nil
}
// create a valid target group name - ensure name is not over 32 characters
func createTargetName(namespacedName types.NamespacedName, frontendPort, nodePort int64) string {
sha := fmt.Sprintf("%x", sha1.Sum([]byte(namespacedName.String())))[:13]
return fmt.Sprintf("k8s-tg-%s-%d-%d", sha, frontendPort, nodePort)
}
func (c *Cloud) createListenerV2(loadBalancerArn *string, mapping nlbPortMapping, namespacedName types.NamespacedName, instanceIDs []string, vpcID string) (listener *elbv2.Listener, targetGroupArn *string, err error) {
targetName := createTargetName(namespacedName, mapping.FrontendPort, mapping.TrafficPort)
klog.Infof("Creating load balancer target group for %v with name: %s", namespacedName, targetName)
target, err := c.ensureTargetGroup(
nil,
mapping,
instanceIDs,
targetName,
vpcID,
)
if err != nil {
return nil, aws.String(""), err
}
createListernerInput := &elbv2.CreateListenerInput{
LoadBalancerArn: loadBalancerArn,
Port: aws.Int64(mapping.FrontendPort),
Protocol: aws.String("TCP"),
DefaultActions: []*elbv2.Action{{
TargetGroupArn: target.TargetGroupArn,
Type: aws.String(elbv2.ActionTypeEnumForward),
}},
}
klog.Infof("Creating load balancer listener for %v", namespacedName)
createListenerOutput, err := c.elbv2.CreateListener(createListernerInput)
if err != nil {
return nil, aws.String(""), fmt.Errorf("Error creating load balancer listener: %q", err)
}
return createListenerOutput.Listeners[0], target.TargetGroupArn, nil
}
// cleans up listener and corresponding target group
func (c *Cloud) deleteListenerV2(listener *elbv2.Listener) error {
_, err := c.elbv2.DeleteListener(&elbv2.DeleteListenerInput{ListenerArn: listener.ListenerArn})
if err != nil {
return fmt.Errorf("Error deleting load balancer listener: %q", err)
}
_, err = c.elbv2.DeleteTargetGroup(&elbv2.DeleteTargetGroupInput{TargetGroupArn: listener.DefaultActions[0].TargetGroupArn})
if err != nil {
return fmt.Errorf("Error deleting load balancer target group: %q", err)
}
return nil
}
// ensureTargetGroup creates a target group with a set of instances
func (c *Cloud) ensureTargetGroup(targetGroup *elbv2.TargetGroup, mapping nlbPortMapping, instances []string, name string, vpcID string) (*elbv2.TargetGroup, error) {
dirty := false
if targetGroup == nil {
input := &elbv2.CreateTargetGroupInput{
VpcId: aws.String(vpcID),
Name: aws.String(name),
Port: aws.Int64(mapping.TrafficPort),
Protocol: aws.String("TCP"),
TargetType: aws.String("instance"),
HealthCheckIntervalSeconds: aws.Int64(30),
HealthCheckPort: aws.String("traffic-port"),
HealthCheckProtocol: aws.String("TCP"),
HealthyThresholdCount: aws.Int64(3),
UnhealthyThresholdCount: aws.Int64(3),
}
input.HealthCheckProtocol = aws.String(mapping.HealthCheckProtocol)
if mapping.HealthCheckProtocol != elbv2.ProtocolEnumTcp {
input.HealthCheckPath = aws.String(mapping.HealthCheckPath)
}
// Account for externalTrafficPolicy = "Local"
if mapping.HealthCheckPort != mapping.TrafficPort {
input.HealthCheckPort = aws.String(strconv.Itoa(int(mapping.HealthCheckPort)))
}
result, err := c.elbv2.CreateTargetGroup(input)
if err != nil {
return nil, fmt.Errorf("Error creating load balancer target group: %q", err)
}
if len(result.TargetGroups) != 1 {
return nil, fmt.Errorf("Expected only one target group on CreateTargetGroup, got %d groups", len(result.TargetGroups))
}
registerInput := &elbv2.RegisterTargetsInput{
TargetGroupArn: result.TargetGroups[0].TargetGroupArn,
Targets: []*elbv2.TargetDescription{},
}
for _, instanceID := range instances {
registerInput.Targets = append(registerInput.Targets, &elbv2.TargetDescription{
Id: aws.String(string(instanceID)),
Port: aws.Int64(mapping.TrafficPort),
})
}
_, err = c.elbv2.RegisterTargets(registerInput)
if err != nil {
return nil, fmt.Errorf("Error registering targets for load balancer: %q", err)
}
return result.TargetGroups[0], nil
}
// handle instances in service
{
healthResponse, err := c.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: targetGroup.TargetGroupArn})
if err != nil {
return nil, fmt.Errorf("Error describing target group health: %q", err)
}
actualIDs := []string{}
for _, healthDescription := range healthResponse.TargetHealthDescriptions {
if healthDescription.TargetHealth.Reason != nil {
switch aws.StringValue(healthDescription.TargetHealth.Reason) {
case elbv2.TargetHealthReasonEnumTargetDeregistrationInProgress:
// We don't need to count this instance in service if it is
// on its way out
default:
actualIDs = append(actualIDs, *healthDescription.Target.Id)
}
}
}
actual := sets.NewString(actualIDs...)
expected := sets.NewString(instances...)
additions := expected.Difference(actual)
removals := actual.Difference(expected)
if len(additions) > 0 {
registerInput := &elbv2.RegisterTargetsInput{
TargetGroupArn: targetGroup.TargetGroupArn,
Targets: []*elbv2.TargetDescription{},
}
for instanceID := range additions {
registerInput.Targets = append(registerInput.Targets, &elbv2.TargetDescription{
Id: aws.String(instanceID),
Port: aws.Int64(mapping.TrafficPort),
})
}
_, err := c.elbv2.RegisterTargets(registerInput)
if err != nil {
return nil, fmt.Errorf("Error registering new targets in target group: %q", err)
}
dirty = true
}
if len(removals) > 0 {
deregisterInput := &elbv2.DeregisterTargetsInput{
TargetGroupArn: targetGroup.TargetGroupArn,
Targets: []*elbv2.TargetDescription{},
}
for instanceID := range removals {
deregisterInput.Targets = append(deregisterInput.Targets, &elbv2.TargetDescription{
Id: aws.String(instanceID),
Port: aws.Int64(mapping.TrafficPort),
})
}
_, err := c.elbv2.DeregisterTargets(deregisterInput)
if err != nil {
return nil, fmt.Errorf("Error trying to deregister targets in target group: %q", err)
}
dirty = true
}
}
// ensure the health check is correct
{
dirtyHealthCheck := false
input := &elbv2.ModifyTargetGroupInput{
TargetGroupArn: targetGroup.TargetGroupArn,
}
if aws.StringValue(targetGroup.HealthCheckProtocol) != mapping.HealthCheckProtocol {
input.HealthCheckProtocol = aws.String(mapping.HealthCheckProtocol)
dirtyHealthCheck = true
}
if aws.StringValue(targetGroup.HealthCheckPort) != strconv.Itoa(int(mapping.HealthCheckPort)) {
input.HealthCheckPort = aws.String(strconv.Itoa(int(mapping.HealthCheckPort)))
dirtyHealthCheck = true
}
if mapping.HealthCheckPath != "" && mapping.HealthCheckProtocol != elbv2.ProtocolEnumTcp {
input.HealthCheckPath = aws.String(mapping.HealthCheckPath)
dirtyHealthCheck = true
}
if dirtyHealthCheck {
_, err := c.elbv2.ModifyTargetGroup(input)
if err != nil {
return nil, fmt.Errorf("Error modifying target group health check: %q", err)
}
dirty = true
}
}
if dirty {
result, err := c.elbv2.DescribeTargetGroups(&elbv2.DescribeTargetGroupsInput{
Names: []*string{aws.String(name)},
})
if err != nil {
return nil, fmt.Errorf("Error retrieving target group after creation/update: %q", err)
}
targetGroup = result.TargetGroups[0]
}
return targetGroup, nil
}
func portsForNLB(lbName string, sg *ec2.SecurityGroup, clientTraffic bool) sets.Int64 {
response := sets.NewInt64()
var annotation string
if clientTraffic {
annotation = fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName)
} else {
annotation = fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName)
}
for i := range sg.IpPermissions {
for j := range sg.IpPermissions[i].IpRanges {
description := aws.StringValue(sg.IpPermissions[i].IpRanges[j].Description)
if description == annotation {
// TODO should probably check FromPort == ToPort
response.Insert(aws.Int64Value(sg.IpPermissions[i].FromPort))
}
}
}
return response
}
// filterForIPRangeDescription filters in security groups that have IpRange Descriptions that match a loadBalancerName
func filterForIPRangeDescription(securityGroups []*ec2.SecurityGroup, lbName string) []*ec2.SecurityGroup {
response := []*ec2.SecurityGroup{}
clientRule := fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName)
healthRule := fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName)
alreadyAdded := sets.NewString()
for i := range securityGroups {
for j := range securityGroups[i].IpPermissions {
for k := range securityGroups[i].IpPermissions[j].IpRanges {
description := aws.StringValue(securityGroups[i].IpPermissions[j].IpRanges[k].Description)
if description == clientRule || description == healthRule {
sgIDString := aws.StringValue(securityGroups[i].GroupId)
if !alreadyAdded.Has(sgIDString) {
response = append(response, securityGroups[i])
alreadyAdded.Insert(sgIDString)
}
}
}
}
}
return response
}
func (c *Cloud) getVpcCidrBlocks() ([]string, error) {
vpcs, err := c.ec2.DescribeVpcs(&ec2.DescribeVpcsInput{
VpcIds: []*string{aws.String(c.vpcID)},
})
if err != nil {
return nil, fmt.Errorf("Error querying VPC for ELB: %q", err)
}
if len(vpcs.Vpcs) != 1 {
return nil, fmt.Errorf("Error querying VPC for ELB, got %d vpcs for %s", len(vpcs.Vpcs), c.vpcID)
}
cidrBlocks := make([]string, 0, len(vpcs.Vpcs[0].CidrBlockAssociationSet))
for _, cidr := range vpcs.Vpcs[0].CidrBlockAssociationSet {
cidrBlocks = append(cidrBlocks, aws.StringValue(cidr.CidrBlock))
}
return cidrBlocks, nil
}
// abstraction for updating SG rules
// if clientTraffic is false, then only update HealthCheck rules
func (c *Cloud) updateInstanceSecurityGroupsForNLBTraffic(actualGroups []*ec2.SecurityGroup, desiredSgIds []string, ports []int64, lbName string, clientCidrs []string, clientTraffic bool) error {
klog.V(8).Infof("updateInstanceSecurityGroupsForNLBTraffic: actualGroups=%v, desiredSgIds=%v, ports=%v, clientTraffic=%v", actualGroups, desiredSgIds, ports, clientTraffic)
// Map containing the groups we want to make changes on; the ports to make
// changes on; and whether to add or remove it. true to add, false to remove
portChanges := map[string]map[int64]bool{}
for _, id := range desiredSgIds {
// consider everything an addition for now
if _, ok := portChanges[id]; !ok {
portChanges[id] = make(map[int64]bool)
}
for _, port := range ports {
portChanges[id][port] = true
}
}
// Compare to actual groups
for _, actualGroup := range actualGroups {
actualGroupID := aws.StringValue(actualGroup.GroupId)
if actualGroupID == "" {
klog.Warning("Ignoring group without ID: ", actualGroup)
continue
}
addingMap, ok := portChanges[actualGroupID]
if ok {
desiredSet := sets.NewInt64()
for port := range addingMap {
desiredSet.Insert(port)
}
existingSet := portsForNLB(lbName, actualGroup, clientTraffic)
// remove from portChanges ports that are already allowed
if intersection := desiredSet.Intersection(existingSet); intersection.Len() > 0 {
for p := range intersection {
delete(portChanges[actualGroupID], p)
}
}
// allowed ports that need to be removed
if difference := existingSet.Difference(desiredSet); difference.Len() > 0 {
for p := range difference {
portChanges[actualGroupID][p] = false
}
}
}
}
// Make changes we've planned on
for instanceSecurityGroupID, portMap := range portChanges {
adds := []*ec2.IpPermission{}
removes := []*ec2.IpPermission{}
for port, add := range portMap {
if add {
if clientTraffic {
klog.V(2).Infof("Adding rule for client MTU discovery from the network load balancer (%s) to instances (%s)", clientCidrs, instanceSecurityGroupID)
klog.V(2).Infof("Adding rule for client traffic from the network load balancer (%s) to instances (%s), port (%v)", clientCidrs, instanceSecurityGroupID, port)
} else {
klog.V(2).Infof("Adding rule for health check traffic from the network load balancer (%s) to instances (%s), port (%v)", clientCidrs, instanceSecurityGroupID, port)
}
} else {
if clientTraffic {
klog.V(2).Infof("Removing rule for client MTU discovery from the network load balancer (%s) to instances (%s)", clientCidrs, instanceSecurityGroupID)
klog.V(2).Infof("Removing rule for client traffic from the network load balancer (%s) to instance (%s), port (%v)", clientCidrs, instanceSecurityGroupID, port)
}
klog.V(2).Infof("Removing rule for health check traffic from the network load balancer (%s) to instance (%s), port (%v)", clientCidrs, instanceSecurityGroupID, port)
}
if clientTraffic {
clientRuleAnnotation := fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName)
// Client Traffic
permission := &ec2.IpPermission{
FromPort: aws.Int64(port),
ToPort: aws.Int64(port),
IpProtocol: aws.String("tcp"),
}
ranges := []*ec2.IpRange{}
for _, cidr := range clientCidrs {
ranges = append(ranges, &ec2.IpRange{
CidrIp: aws.String(cidr),
Description: aws.String(clientRuleAnnotation),
})
}
permission.IpRanges = ranges
if add {
adds = append(adds, permission)
} else {
removes = append(removes, permission)
}
} else {
healthRuleAnnotation := fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName)
// NLB HealthCheck
permission := &ec2.IpPermission{
FromPort: aws.Int64(port),
ToPort: aws.Int64(port),
IpProtocol: aws.String("tcp"),
}
ranges := []*ec2.IpRange{}
for _, cidr := range clientCidrs {
ranges = append(ranges, &ec2.IpRange{
CidrIp: aws.String(cidr),
Description: aws.String(healthRuleAnnotation),
})
}
permission.IpRanges = ranges
if add {
adds = append(adds, permission)
} else {
removes = append(removes, permission)
}
}
}
if len(adds) > 0 {
changed, err := c.addSecurityGroupIngress(instanceSecurityGroupID, adds)
if err != nil {
return err
}
if !changed {
klog.Warning("Allowing ingress was not needed; concurrent change? groupId=", instanceSecurityGroupID)
}
}
if len(removes) > 0 {
changed, err := c.removeSecurityGroupIngress(instanceSecurityGroupID, removes)
if err != nil {
return err
}
if !changed {
klog.Warning("Revoking ingress was not needed; concurrent change? groupId=", instanceSecurityGroupID)
}
}
if clientTraffic {
// MTU discovery
mtuRuleAnnotation := fmt.Sprintf("%s=%s", NLBMtuDiscoveryRuleDescription, lbName)
mtuPermission := &ec2.IpPermission{
IpProtocol: aws.String("icmp"),
FromPort: aws.Int64(3),
ToPort: aws.Int64(4),
}
ranges := []*ec2.IpRange{}
for _, cidr := range clientCidrs {
ranges = append(ranges, &ec2.IpRange{
CidrIp: aws.String(cidr),
Description: aws.String(mtuRuleAnnotation),
})
}
mtuPermission.IpRanges = ranges
group, err := c.findSecurityGroup(instanceSecurityGroupID)
if err != nil {
klog.Warningf("Error retrieving security group: %q", err)
return err
}
if group == nil {
klog.Warning("Security group not found: ", instanceSecurityGroupID)
return nil
}
icmpExists := false
permCount := 0
for _, perm := range group.IpPermissions {
if *perm.IpProtocol == "icmp" {
icmpExists = true
continue
}
if perm.FromPort != nil {
permCount++
}
}
if !icmpExists && permCount > 0 {
// the icmp permission is missing
changed, err := c.addSecurityGroupIngress(instanceSecurityGroupID, []*ec2.IpPermission{mtuPermission})
if err != nil {
klog.Warningf("Error adding MTU permission to security group: %q", err)
return err
}
if !changed {
klog.Warning("Allowing ingress was not needed; concurrent change? groupId=", instanceSecurityGroupID)
}
} else if icmpExists && permCount == 0 {
// there is no additional permissions, remove icmp
changed, err := c.removeSecurityGroupIngress(instanceSecurityGroupID, []*ec2.IpPermission{mtuPermission})
if err != nil {
klog.Warningf("Error removing MTU permission to security group: %q", err)
return err
}
if !changed {
klog.Warning("Revoking ingress was not needed; concurrent change? groupId=", instanceSecurityGroupID)
}
}
}
}
return nil
}
// Add SG rules for a given NLB
func (c *Cloud) updateInstanceSecurityGroupsForNLB(mappings []nlbPortMapping, instances map[InstanceID]*ec2.Instance, lbName string, clientCidrs []string) error {
if c.cfg.Global.DisableSecurityGroupIngress {
return nil
}
vpcCidrBlocks, err := c.getVpcCidrBlocks()
if err != nil {
return err
}
// Unlike the classic ELB, NLB does not have a security group that we can
// filter against all existing groups to see if they allow access. Instead
// we use the IpRange.Description field to annotate NLB health check and
// client traffic rules
// Get the actual list of groups that allow ingress for the load-balancer
var actualGroups []*ec2.SecurityGroup
{
// Server side filter
describeRequest := &ec2.DescribeSecurityGroupsInput{}
filters := []*ec2.Filter{
newEc2Filter("ip-permission.protocol", "tcp"),
}
describeRequest.Filters = c.tagging.addFilters(filters)
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
if err != nil {
return fmt.Errorf("Error querying security groups for NLB: %q", err)
}
for _, sg := range response {
if !c.tagging.hasClusterTag(sg.Tags) {
continue
}
actualGroups = append(actualGroups, sg)
}
// client-side filter
// Filter out groups that don't have IP Rules we've annotated for this service
actualGroups = filterForIPRangeDescription(actualGroups, lbName)
}
taggedSecurityGroups, err := c.getTaggedSecurityGroups()
if err != nil {
return fmt.Errorf("Error querying for tagged security groups: %q", err)
}
externalTrafficPolicyIsLocal := false
trafficPorts := []int64{}
for i := range mappings {
trafficPorts = append(trafficPorts, mappings[i].TrafficPort)
if mappings[i].TrafficPort != mappings[i].HealthCheckPort {
externalTrafficPolicyIsLocal = true
}
}
healthCheckPorts := trafficPorts
// if externalTrafficPolicy is Local, all listeners use the same health
// check port
if externalTrafficPolicyIsLocal && len(mappings) > 0 {
healthCheckPorts = []int64{mappings[0].HealthCheckPort}
}
desiredGroupIds := []string{}
// Scan instances for groups we want open
for _, instance := range instances {
securityGroup, err := findSecurityGroupForInstance(instance, taggedSecurityGroups)
if err != nil {
return err
}
if securityGroup == nil {
klog.Warningf("Ignoring instance without security group: %s", aws.StringValue(instance.InstanceId))
continue
}
id := aws.StringValue(securityGroup.GroupId)
if id == "" {
klog.Warningf("found security group without id: %v", securityGroup)
continue
}
desiredGroupIds = append(desiredGroupIds, id)
}
// Run once for Client traffic
err = c.updateInstanceSecurityGroupsForNLBTraffic(actualGroups, desiredGroupIds, trafficPorts, lbName, clientCidrs, true)
if err != nil {
return err
}
// Run once for health check traffic
err = c.updateInstanceSecurityGroupsForNLBTraffic(actualGroups, desiredGroupIds, healthCheckPorts, lbName, vpcCidrBlocks, false)
if err != nil {
return err
}
return nil
}
func (c *Cloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadBalancerName string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string, internalELB, proxyProtocol bool, loadBalancerAttributes *elb.LoadBalancerAttributes, annotations map[string]string) (*elb.LoadBalancerDescription, error) {
loadBalancer, err := c.describeLoadBalancer(loadBalancerName)
if err != nil {
return nil, err
}
dirty := false
if loadBalancer == nil {
createRequest := &elb.CreateLoadBalancerInput{}
createRequest.LoadBalancerName = aws.String(loadBalancerName)
createRequest.Listeners = listeners
if internalELB {
createRequest.Scheme = aws.String("internal")
}
// We are supposed to specify one subnet per AZ.
// TODO: What happens if we have more than one subnet per AZ?
if subnetIDs == nil {
createRequest.Subnets = nil
} else {
createRequest.Subnets = aws.StringSlice(subnetIDs)
}
if securityGroupIDs == nil {
createRequest.SecurityGroups = nil
} else {
createRequest.SecurityGroups = aws.StringSlice(securityGroupIDs)
}
// Get additional tags set by the user
tags := getLoadBalancerAdditionalTags(annotations)
// Add default tags
tags[TagNameKubernetesService] = namespacedName.String()
tags = c.tagging.buildTags(ResourceLifecycleOwned, tags)
for k, v := range tags {
createRequest.Tags = append(createRequest.Tags, &elb.Tag{
Key: aws.String(k), Value: aws.String(v),
})
}
klog.Infof("Creating load balancer for %v with name: %s", namespacedName, loadBalancerName)
_, err := c.elb.CreateLoadBalancer(createRequest)
if err != nil {
return nil, err
}
if proxyProtocol {
err = c.createProxyProtocolPolicy(loadBalancerName)
if err != nil {
return nil, err
}
for _, listener := range listeners {
klog.V(2).Infof("Adjusting AWS loadbalancer proxy protocol on node port %d. Setting to true", *listener.InstancePort)
err := c.setBackendPolicies(loadBalancerName, *listener.InstancePort, []*string{aws.String(ProxyProtocolPolicyName)})
if err != nil {
return nil, err
}
}
}
dirty = true
} else {
// TODO: Sync internal vs non-internal
{
// Sync subnets
expected := sets.NewString(subnetIDs...)
actual := stringSetFromPointers(loadBalancer.Subnets)
additions := expected.Difference(actual)
removals := actual.Difference(expected)
if removals.Len() != 0 {
request := &elb.DetachLoadBalancerFromSubnetsInput{}
request.LoadBalancerName = aws.String(loadBalancerName)
request.Subnets = stringSetToPointers(removals)
klog.V(2).Info("Detaching load balancer from removed subnets")
_, err := c.elb.DetachLoadBalancerFromSubnets(request)
if err != nil {
return nil, fmt.Errorf("error detaching AWS loadbalancer from subnets: %q", err)
}
dirty = true
}
if additions.Len() != 0 {
request := &elb.AttachLoadBalancerToSubnetsInput{}
request.LoadBalancerName = aws.String(loadBalancerName)
request.Subnets = stringSetToPointers(additions)
klog.V(2).Info("Attaching load balancer to added subnets")
_, err := c.elb.AttachLoadBalancerToSubnets(request)
if err != nil {
return nil, fmt.Errorf("error attaching AWS loadbalancer to subnets: %q", err)
}
dirty = true
}
}
{
// Sync security groups
expected := sets.NewString(securityGroupIDs...)
actual := stringSetFromPointers(loadBalancer.SecurityGroups)
if !expected.Equal(actual) {
// This call just replaces the security groups, unlike e.g. subnets (!)
request := &elb.ApplySecurityGroupsToLoadBalancerInput{}
request.LoadBalancerName = aws.String(loadBalancerName)
if securityGroupIDs == nil {
request.SecurityGroups = nil
} else {
request.SecurityGroups = aws.StringSlice(securityGroupIDs)
}
klog.V(2).Info("Applying updated security groups to load balancer")
_, err := c.elb.ApplySecurityGroupsToLoadBalancer(request)
if err != nil {
return nil, fmt.Errorf("error applying AWS loadbalancer security groups: %q", err)
}
dirty = true
}
}
{
// Sync listeners
listenerDescriptions := loadBalancer.ListenerDescriptions
foundSet := make(map[int]bool)
removals := []*int64{}
for _, listenerDescription := range listenerDescriptions {
actual := listenerDescription.Listener
if actual == nil {
klog.Warning("Ignoring empty listener in AWS loadbalancer: ", loadBalancerName)
continue
}
found := -1
for i, expected := range listeners {
if elbProtocolsAreEqual(actual.Protocol, expected.Protocol) {
continue
}
if elbProtocolsAreEqual(actual.InstanceProtocol, expected.InstanceProtocol) {
continue
}
if aws.Int64Value(actual.InstancePort) != aws.Int64Value(expected.InstancePort) {
continue
}
if aws.Int64Value(actual.LoadBalancerPort) != aws.Int64Value(expected.LoadBalancerPort) {
continue
}
if awsArnEquals(actual.SSLCertificateId, expected.SSLCertificateId) {
continue
}
found = i
}
if found != -1 {
foundSet[found] = true
} else {
removals = append(removals, actual.LoadBalancerPort)
}
}
additions := []*elb.Listener{}
for i := range listeners {
if foundSet[i] {
continue
}
additions = append(additions, listeners[i])
}
if len(removals) != 0 {
request := &elb.DeleteLoadBalancerListenersInput{}
request.LoadBalancerName = aws.String(loadBalancerName)
request.LoadBalancerPorts = removals
klog.V(2).Info("Deleting removed load balancer listeners")
_, err := c.elb.DeleteLoadBalancerListeners(request)
if err != nil {
return nil, fmt.Errorf("error deleting AWS loadbalancer listeners: %q", err)
}
dirty = true
}
if len(additions) != 0 {
request := &elb.CreateLoadBalancerListenersInput{}
request.LoadBalancerName = aws.String(loadBalancerName)
request.Listeners = additions
klog.V(2).Info("Creating added load balancer listeners")
_, err := c.elb.CreateLoadBalancerListeners(request)
if err != nil {
return nil, fmt.Errorf("error creating AWS loadbalancer listeners: %q", err)
}
dirty = true
}
}
{
// Sync proxy protocol state for new and existing listeners
proxyPolicies := make([]*string, 0)
if proxyProtocol {
// Ensure the backend policy exists
// NOTE The documentation for the AWS API indicates we could get an HTTP 400
// back if a policy of the same name already exists. However, the aws-sdk does not
// seem to return an error to us in these cases. Therefore, this will issue an API
// request every time.
err := c.createProxyProtocolPolicy(loadBalancerName)
if err != nil {
return nil, err
}
proxyPolicies = append(proxyPolicies, aws.String(ProxyProtocolPolicyName))
}
foundBackends := make(map[int64]bool)
proxyProtocolBackends := make(map[int64]bool)
for _, backendListener := range loadBalancer.BackendServerDescriptions {
foundBackends[*backendListener.InstancePort] = false
proxyProtocolBackends[*backendListener.InstancePort] = proxyProtocolEnabled(backendListener)
}
for _, listener := range listeners {
setPolicy := false
instancePort := *listener.InstancePort
if currentState, ok := proxyProtocolBackends[instancePort]; !ok {
// This is a new ELB backend so we only need to worry about
// potentially adding a policy and not removing an
// existing one
setPolicy = proxyProtocol
} else {
foundBackends[instancePort] = true
// This is an existing ELB backend so we need to determine
// if the state changed
setPolicy = (currentState != proxyProtocol)
}
if setPolicy {
klog.V(2).Infof("Adjusting AWS loadbalancer proxy protocol on node port %d. Setting to %t", instancePort, proxyProtocol)
err := c.setBackendPolicies(loadBalancerName, instancePort, proxyPolicies)
if err != nil {
return nil, err
}
dirty = true
}
}
// We now need to figure out if any backend policies need removed
// because these old policies will stick around even if there is no
// corresponding listener anymore
for instancePort, found := range foundBackends {
if !found {
klog.V(2).Infof("Adjusting AWS loadbalancer proxy protocol on node port %d. Setting to false", instancePort)
err := c.setBackendPolicies(loadBalancerName, instancePort, []*string{})
if err != nil {
return nil, err
}
dirty = true
}
}
}
{
// Add additional tags
klog.V(2).Infof("Creating additional load balancer tags for %s", loadBalancerName)
tags := getLoadBalancerAdditionalTags(annotations)
if len(tags) > 0 {
err := c.addLoadBalancerTags(loadBalancerName, tags)
if err != nil {
return nil, fmt.Errorf("unable to create additional load balancer tags: %v", err)
}
}
}
}
// Whether the ELB was new or existing, sync attributes regardless. This accounts for things
// that cannot be specified at the time of creation and can only be modified after the fact,
// e.g. idle connection timeout.
{
describeAttributesRequest := &elb.DescribeLoadBalancerAttributesInput{}
describeAttributesRequest.LoadBalancerName = aws.String(loadBalancerName)
describeAttributesOutput, err := c.elb.DescribeLoadBalancerAttributes(describeAttributesRequest)
if err != nil {
klog.Warning("Unable to retrieve load balancer attributes during attribute sync")
return nil, err
}
foundAttributes := &describeAttributesOutput.LoadBalancerAttributes
// Update attributes if they're dirty
if !reflect.DeepEqual(loadBalancerAttributes, foundAttributes) {
klog.V(2).Infof("Updating load-balancer attributes for %q", loadBalancerName)
modifyAttributesRequest := &elb.ModifyLoadBalancerAttributesInput{}
modifyAttributesRequest.LoadBalancerName = aws.String(loadBalancerName)
modifyAttributesRequest.LoadBalancerAttributes = loadBalancerAttributes
_, err = c.elb.ModifyLoadBalancerAttributes(modifyAttributesRequest)
if err != nil {
return nil, fmt.Errorf("Unable to update load balancer attributes during attribute sync: %q", err)
}
dirty = true
}
}
if dirty {
loadBalancer, err = c.describeLoadBalancer(loadBalancerName)
if err != nil {
klog.Warning("Unable to retrieve load balancer after creation/update")
return nil, err
}
}
return loadBalancer, nil
}
func createSubnetMappings(subnetIDs []string) []*elbv2.SubnetMapping {
response := []*elbv2.SubnetMapping{}
for _, id := range subnetIDs {
// Ignore AllocationId for now
response = append(response, &elbv2.SubnetMapping{SubnetId: aws.String(id)})
}
return response
}
// elbProtocolsAreEqual checks if two ELB protocol strings are considered the same
// Comparison is case insensitive
func elbProtocolsAreEqual(l, r *string) bool {
if l == nil || r == nil {
return l == r
}
return strings.EqualFold(aws.StringValue(l), aws.StringValue(r))
}
// awsArnEquals checks if two ARN strings are considered the same
// Comparison is case insensitive
func awsArnEquals(l, r *string) bool {
if l == nil || r == nil {
return l == r
}
return strings.EqualFold(aws.StringValue(l), aws.StringValue(r))
}
// getExpectedHealthCheck returns an elb.Healthcheck for the provided target
// and using either sensible defaults or overrides via Service annotations
func (c *Cloud) getExpectedHealthCheck(target string, annotations map[string]string) (*elb.HealthCheck, error) {
healthcheck := &elb.HealthCheck{Target: &target}
getOrDefault := func(annotation string, defaultValue int64) (*int64, error) {
i64 := defaultValue
var err error
if s, ok := annotations[annotation]; ok {
i64, err = strconv.ParseInt(s, 10, 0)
if err != nil {
return nil, fmt.Errorf("failed parsing health check annotation value: %v", err)
}
}
return &i64, nil
}
var err error
healthcheck.HealthyThreshold, err = getOrDefault(ServiceAnnotationLoadBalancerHCHealthyThreshold, defaultHCHealthyThreshold)
if err != nil {
return nil, err
}
healthcheck.UnhealthyThreshold, err = getOrDefault(ServiceAnnotationLoadBalancerHCUnhealthyThreshold, defaultHCUnhealthyThreshold)
if err != nil {
return nil, err
}
healthcheck.Timeout, err = getOrDefault(ServiceAnnotationLoadBalancerHCTimeout, defaultHCTimeout)
if err != nil {
return nil, err
}
healthcheck.Interval, err = getOrDefault(ServiceAnnotationLoadBalancerHCInterval, defaultHCInterval)
if err != nil {
return nil, err
}
if err = healthcheck.Validate(); err != nil {
return nil, fmt.Errorf("some of the load balancer health check parameters are invalid: %v", err)
}
return healthcheck, nil
}
// Makes sure that the health check for an ELB matches the configured health check node port
func (c *Cloud) ensureLoadBalancerHealthCheck(loadBalancer *elb.LoadBalancerDescription, protocol string, port int32, path string, annotations map[string]string) error {
name := aws.StringValue(loadBalancer.LoadBalancerName)
actual := loadBalancer.HealthCheck
expectedTarget := protocol + ":" + strconv.FormatInt(int64(port), 10) + path
expected, err := c.getExpectedHealthCheck(expectedTarget, annotations)
if err != nil {
return fmt.Errorf("cannot update health check for load balancer %q: %q", name, err)
}
// comparing attributes 1 by 1 to avoid breakage in case a new field is
// added to the HC which breaks the equality
if aws.StringValue(expected.Target) == aws.StringValue(actual.Target) &&
aws.Int64Value(expected.HealthyThreshold) == aws.Int64Value(actual.HealthyThreshold) &&
aws.Int64Value(expected.UnhealthyThreshold) == aws.Int64Value(actual.UnhealthyThreshold) &&
aws.Int64Value(expected.Interval) == aws.Int64Value(actual.Interval) &&
aws.Int64Value(expected.Timeout) == aws.Int64Value(actual.Timeout) {
return nil
}
request := &elb.ConfigureHealthCheckInput{}
request.HealthCheck = expected
request.LoadBalancerName = loadBalancer.LoadBalancerName
_, err = c.elb.ConfigureHealthCheck(request)
if err != nil {
return fmt.Errorf("error configuring load balancer health check for %q: %q", name, err)
}
return nil
}
// Makes sure that exactly the specified hosts are registered as instances with the load balancer
func (c *Cloud) ensureLoadBalancerInstances(loadBalancerName string, lbInstances []*elb.Instance, instanceIDs map[InstanceID]*ec2.Instance) error {
expected := sets.NewString()
for id := range instanceIDs {
expected.Insert(string(id))
}
actual := sets.NewString()
for _, lbInstance := range lbInstances {
actual.Insert(aws.StringValue(lbInstance.InstanceId))
}
additions := expected.Difference(actual)
removals := actual.Difference(expected)
addInstances := []*elb.Instance{}
for _, instanceID := range additions.List() {
addInstance := &elb.Instance{}
addInstance.InstanceId = aws.String(instanceID)
addInstances = append(addInstances, addInstance)
}
removeInstances := []*elb.Instance{}
for _, instanceID := range removals.List() {
removeInstance := &elb.Instance{}
removeInstance.InstanceId = aws.String(instanceID)
removeInstances = append(removeInstances, removeInstance)
}
if len(addInstances) > 0 {
registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{}
registerRequest.Instances = addInstances
registerRequest.LoadBalancerName = aws.String(loadBalancerName)
_, err := c.elb.RegisterInstancesWithLoadBalancer(registerRequest)
if err != nil {
return err
}
klog.V(1).Infof("Instances added to load-balancer %s", loadBalancerName)
}
if len(removeInstances) > 0 {
deregisterRequest := &elb.DeregisterInstancesFromLoadBalancerInput{}
deregisterRequest.Instances = removeInstances
deregisterRequest.LoadBalancerName = aws.String(loadBalancerName)
_, err := c.elb.DeregisterInstancesFromLoadBalancer(deregisterRequest)
if err != nil {
return err
}
klog.V(1).Infof("Instances removed from load-balancer %s", loadBalancerName)
}
return nil
}
func (c *Cloud) getLoadBalancerTLSPorts(loadBalancer *elb.LoadBalancerDescription) []int64 {
ports := []int64{}
for _, listenerDescription := range loadBalancer.ListenerDescriptions {
protocol := aws.StringValue(listenerDescription.Listener.Protocol)
if protocol == "SSL" || protocol == "HTTPS" {
ports = append(ports, aws.Int64Value(listenerDescription.Listener.LoadBalancerPort))
}
}
return ports
}
func (c *Cloud) ensureSSLNegotiationPolicy(loadBalancer *elb.LoadBalancerDescription, policyName string) error {
klog.V(2).Info("Describing load balancer policies on load balancer")
result, err := c.elb.DescribeLoadBalancerPolicies(&elb.DescribeLoadBalancerPoliciesInput{
LoadBalancerName: loadBalancer.LoadBalancerName,
PolicyNames: []*string{
aws.String(fmt.Sprintf(SSLNegotiationPolicyNameFormat, policyName)),
},
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case elb.ErrCodePolicyNotFoundException:
default:
return fmt.Errorf("error describing security policies on load balancer: %q", err)
}
}
}
if len(result.PolicyDescriptions) > 0 {
return nil
}
klog.V(2).Infof("Creating SSL negotiation policy '%s' on load balancer", fmt.Sprintf(SSLNegotiationPolicyNameFormat, policyName))
// there is an upper limit of 98 policies on an ELB, we're pretty safe from
// running into it
_, err = c.elb.CreateLoadBalancerPolicy(&elb.CreateLoadBalancerPolicyInput{
LoadBalancerName: loadBalancer.LoadBalancerName,
PolicyName: aws.String(fmt.Sprintf(SSLNegotiationPolicyNameFormat, policyName)),
PolicyTypeName: aws.String("SSLNegotiationPolicyType"),
PolicyAttributes: []*elb.PolicyAttribute{
{
AttributeName: aws.String("Reference-Security-Policy"),
AttributeValue: aws.String(policyName),
},
},
})
if err != nil {
return fmt.Errorf("error creating security policy on load balancer: %q", err)
}
return nil
}
func (c *Cloud) setSSLNegotiationPolicy(loadBalancerName, sslPolicyName string, port int64) error {
policyName := fmt.Sprintf(SSLNegotiationPolicyNameFormat, sslPolicyName)
request := &elb.SetLoadBalancerPoliciesOfListenerInput{
LoadBalancerName: aws.String(loadBalancerName),
LoadBalancerPort: aws.Int64(port),
PolicyNames: []*string{
aws.String(policyName),
},
}
klog.V(2).Infof("Setting SSL negotiation policy '%s' on load balancer", policyName)
_, err := c.elb.SetLoadBalancerPoliciesOfListener(request)
if err != nil {
return fmt.Errorf("error setting SSL negotiation policy '%s' on load balancer: %q", policyName, err)
}
return nil
}
func (c *Cloud) createProxyProtocolPolicy(loadBalancerName string) error {
request := &elb.CreateLoadBalancerPolicyInput{
LoadBalancerName: aws.String(loadBalancerName),
PolicyName: aws.String(ProxyProtocolPolicyName),
PolicyTypeName: aws.String("ProxyProtocolPolicyType"),
PolicyAttributes: []*elb.PolicyAttribute{
{
AttributeName: aws.String("ProxyProtocol"),
AttributeValue: aws.String("true"),
},
},
}
klog.V(2).Info("Creating proxy protocol policy on load balancer")
_, err := c.elb.CreateLoadBalancerPolicy(request)
if err != nil {
return fmt.Errorf("error creating proxy protocol policy on load balancer: %q", err)
}
return nil
}
func (c *Cloud) setBackendPolicies(loadBalancerName string, instancePort int64, policies []*string) error {
request := &elb.SetLoadBalancerPoliciesForBackendServerInput{
InstancePort: aws.Int64(instancePort),
LoadBalancerName: aws.String(loadBalancerName),
PolicyNames: policies,
}
if len(policies) > 0 {
klog.V(2).Infof("Adding AWS loadbalancer backend policies on node port %d", instancePort)
} else {
klog.V(2).Infof("Removing AWS loadbalancer backend policies on node port %d", instancePort)
}
_, err := c.elb.SetLoadBalancerPoliciesForBackendServer(request)
if err != nil {
return fmt.Errorf("error adjusting AWS loadbalancer backend policies: %q", err)
}
return nil
}
func proxyProtocolEnabled(backend *elb.BackendServerDescription) bool {
for _, policy := range backend.PolicyNames {
if aws.StringValue(policy) == ProxyProtocolPolicyName {
return true
}
}
return false
}
// findInstancesForELB gets the EC2 instances corresponding to the Nodes, for setting up an ELB
// We ignore Nodes (with a log message) where the instanceid cannot be determined from the provider,
// and we ignore instances which are not found
func (c *Cloud) findInstancesForELB(nodes []*v1.Node) (map[InstanceID]*ec2.Instance, error) {
// Map to instance ids ignoring Nodes where we cannot find the id (but logging)
instanceIDs := mapToAWSInstanceIDsTolerant(nodes)
cacheCriteria := cacheCriteria{
// MaxAge not required, because we only care about security groups, which should not change
HasInstances: instanceIDs, // Refresh if any of the instance ids are missing
}
snapshot, err := c.instanceCache.describeAllInstancesCached(cacheCriteria)
if err != nil {
return nil, err
}
instances := snapshot.FindInstances(instanceIDs)
// We ignore instances that cannot be found
return instances, nil
}