| /* |
| Copyright 2017 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package aws |
| |
| import ( |
| "fmt" |
| "strings" |
| |
| "github.com/aws/aws-sdk-go/aws" |
| "github.com/aws/aws-sdk-go/service/ec2" |
| "k8s.io/klog" |
| |
| "k8s.io/apimachinery/pkg/util/wait" |
| ) |
| |
| // TagNameKubernetesClusterPrefix is the tag name we use to differentiate multiple |
| // logically independent clusters running in the same AZ. |
| // The tag key = TagNameKubernetesClusterPrefix + clusterID |
| // The tag value is an ownership value |
| const TagNameKubernetesClusterPrefix = "kubernetes.io/cluster/" |
| |
| // TagNameKubernetesClusterLegacy is the legacy tag name we use to differentiate multiple |
| // logically independent clusters running in the same AZ. The problem with it was that it |
| // did not allow shared resources. |
| const TagNameKubernetesClusterLegacy = "KubernetesCluster" |
| |
| // ResourceLifecycle is the cluster lifecycle state used in tagging |
| type ResourceLifecycle string |
| |
| const ( |
| // ResourceLifecycleOwned is the value we use when tagging resources to indicate |
| // that the resource is considered owned and managed by the cluster, |
| // and in particular that the lifecycle is tied to the lifecycle of the cluster. |
| ResourceLifecycleOwned = "owned" |
| // ResourceLifecycleShared is the value we use when tagging resources to indicate |
| // that the resource is shared between multiple clusters, and should not be destroyed |
| // if the cluster is destroyed. |
| ResourceLifecycleShared = "shared" |
| ) |
| |
| type awsTagging struct { |
| // ClusterID is our cluster identifier: we tag AWS resources with this value, |
| // and thus we can run two independent clusters in the same VPC or subnets. |
| // This gives us similar functionality to GCE projects. |
| ClusterID string |
| |
| // usesLegacyTags is true if we are using the legacy TagNameKubernetesClusterLegacy tags |
| usesLegacyTags bool |
| } |
| |
| func (t *awsTagging) init(legacyClusterID string, clusterID string) error { |
| if legacyClusterID != "" { |
| if clusterID != "" && legacyClusterID != clusterID { |
| return fmt.Errorf("ClusterID tags did not match: %q vs %q", clusterID, legacyClusterID) |
| } |
| t.usesLegacyTags = true |
| clusterID = legacyClusterID |
| } |
| |
| t.ClusterID = clusterID |
| |
| if clusterID != "" { |
| klog.Infof("AWS cloud filtering on ClusterID: %v", clusterID) |
| } else { |
| return fmt.Errorf("AWS cloud failed to find ClusterID") |
| } |
| |
| return nil |
| } |
| |
| // Extracts a clusterID from the given tags, if one is present |
| // If no clusterID is found, returns "", nil |
| // If multiple (different) clusterIDs are found, returns an error |
| func (t *awsTagging) initFromTags(tags []*ec2.Tag) error { |
| legacyClusterID, newClusterID, err := findClusterIDs(tags) |
| if err != nil { |
| return err |
| } |
| |
| if legacyClusterID == "" && newClusterID == "" { |
| klog.Errorf("Tag %q nor %q not found; Kubernetes may behave unexpectedly.", TagNameKubernetesClusterLegacy, TagNameKubernetesClusterPrefix+"...") |
| } |
| |
| return t.init(legacyClusterID, newClusterID) |
| } |
| |
| // Extracts the legacy & new cluster ids from the given tags, if they are present |
| // If duplicate tags are found, returns an error |
| func findClusterIDs(tags []*ec2.Tag) (string, string, error) { |
| legacyClusterID := "" |
| newClusterID := "" |
| |
| for _, tag := range tags { |
| tagKey := aws.StringValue(tag.Key) |
| if strings.HasPrefix(tagKey, TagNameKubernetesClusterPrefix) { |
| id := strings.TrimPrefix(tagKey, TagNameKubernetesClusterPrefix) |
| if newClusterID != "" { |
| return "", "", fmt.Errorf("Found multiple cluster tags with prefix %s (%q and %q)", TagNameKubernetesClusterPrefix, newClusterID, id) |
| } |
| newClusterID = id |
| } |
| |
| if tagKey == TagNameKubernetesClusterLegacy { |
| id := aws.StringValue(tag.Value) |
| if legacyClusterID != "" { |
| return "", "", fmt.Errorf("Found multiple %s tags (%q and %q)", TagNameKubernetesClusterLegacy, legacyClusterID, id) |
| } |
| legacyClusterID = id |
| } |
| } |
| |
| return legacyClusterID, newClusterID, nil |
| } |
| |
| func (t *awsTagging) clusterTagKey() string { |
| return TagNameKubernetesClusterPrefix + t.ClusterID |
| } |
| |
| func (t *awsTagging) hasClusterTag(tags []*ec2.Tag) bool { |
| // if the clusterID is not configured -- we consider all instances. |
| if len(t.ClusterID) == 0 { |
| return true |
| } |
| clusterTagKey := t.clusterTagKey() |
| for _, tag := range tags { |
| tagKey := aws.StringValue(tag.Key) |
| // For 1.6, we continue to recognize the legacy tags, for the 1.5 -> 1.6 upgrade |
| // Note that we want to continue traversing tag list if we see a legacy tag with value != ClusterID |
| if (tagKey == TagNameKubernetesClusterLegacy) && (aws.StringValue(tag.Value) == t.ClusterID) { |
| return true |
| } |
| if tagKey == clusterTagKey { |
| return true |
| } |
| } |
| return false |
| } |
| |
| // Ensure that a resource has the correct tags |
| // If it has no tags, we assume that this was a problem caused by an error in between creation and tagging, |
| // and we add the tags. If it has a different cluster's tags, that is an error. |
| func (t *awsTagging) readRepairClusterTags(client EC2, resourceID string, lifecycle ResourceLifecycle, additionalTags map[string]string, observedTags []*ec2.Tag) error { |
| actualTagMap := make(map[string]string) |
| for _, tag := range observedTags { |
| actualTagMap[aws.StringValue(tag.Key)] = aws.StringValue(tag.Value) |
| } |
| |
| expectedTags := t.buildTags(lifecycle, additionalTags) |
| |
| addTags := make(map[string]string) |
| for k, expected := range expectedTags { |
| actual := actualTagMap[k] |
| if actual == expected { |
| continue |
| } |
| if actual == "" { |
| klog.Warningf("Resource %q was missing expected cluster tag %q. Will add (with value %q)", resourceID, k, expected) |
| addTags[k] = expected |
| } else { |
| return fmt.Errorf("resource %q has tag belonging to another cluster: %q=%q (expected %q)", resourceID, k, actual, expected) |
| } |
| } |
| |
| if len(addTags) == 0 { |
| return nil |
| } |
| |
| if err := t.createTags(client, resourceID, lifecycle, addTags); err != nil { |
| return fmt.Errorf("error adding missing tags to resource %q: %q", resourceID, err) |
| } |
| |
| return nil |
| } |
| |
| // createTags calls EC2 CreateTags, but adds retry-on-failure logic |
| // We retry mainly because if we create an object, we cannot tag it until it is "fully created" (eventual consistency) |
| // The error code varies though (depending on what we are tagging), so we simply retry on all errors |
| func (t *awsTagging) createTags(client EC2, resourceID string, lifecycle ResourceLifecycle, additionalTags map[string]string) error { |
| tags := t.buildTags(lifecycle, additionalTags) |
| |
| if tags == nil || len(tags) == 0 { |
| return nil |
| } |
| |
| var awsTags []*ec2.Tag |
| for k, v := range tags { |
| tag := &ec2.Tag{ |
| Key: aws.String(k), |
| Value: aws.String(v), |
| } |
| awsTags = append(awsTags, tag) |
| } |
| |
| backoff := wait.Backoff{ |
| Duration: createTagInitialDelay, |
| Factor: createTagFactor, |
| Steps: createTagSteps, |
| } |
| request := &ec2.CreateTagsInput{} |
| request.Resources = []*string{&resourceID} |
| request.Tags = awsTags |
| |
| var lastErr error |
| err := wait.ExponentialBackoff(backoff, func() (bool, error) { |
| _, err := client.CreateTags(request) |
| if err == nil { |
| return true, nil |
| } |
| |
| // We could check that the error is retryable, but the error code changes based on what we are tagging |
| // SecurityGroup: InvalidGroup.NotFound |
| klog.V(2).Infof("Failed to create tags; will retry. Error was %q", err) |
| lastErr = err |
| return false, nil |
| }) |
| if err == wait.ErrWaitTimeout { |
| // return real CreateTags error instead of timeout |
| err = lastErr |
| } |
| return err |
| } |
| |
| // Add additional filters, to match on our tags |
| // This lets us run multiple k8s clusters in a single EC2 AZ |
| func (t *awsTagging) addFilters(filters []*ec2.Filter) []*ec2.Filter { |
| // if there are no clusterID configured - no filtering by special tag names |
| // should be applied to revert to legacy behaviour. |
| if len(t.ClusterID) == 0 { |
| if len(filters) == 0 { |
| // We can't pass a zero-length Filters to AWS (it's an error) |
| // So if we end up with no filters; just return nil |
| return nil |
| } |
| return filters |
| } |
| // For 1.6, we always recognize the legacy tag, for the 1.5 -> 1.6 upgrade |
| // There are no "or" filters by key, so we look for both the legacy and new key, and then we have to post-filter |
| f := newEc2Filter("tag-key", TagNameKubernetesClusterLegacy, t.clusterTagKey()) |
| |
| // We can't pass a zero-length Filters to AWS (it's an error) |
| // So if we end up with no filters; we need to return nil |
| filters = append(filters, f) |
| return filters |
| } |
| |
| func (t *awsTagging) buildTags(lifecycle ResourceLifecycle, additionalTags map[string]string) map[string]string { |
| tags := make(map[string]string) |
| for k, v := range additionalTags { |
| tags[k] = v |
| } |
| |
| // no clusterID is a sign of misconfigured cluster, but we can't be tagging the resources with empty |
| // strings |
| if len(t.ClusterID) == 0 { |
| return tags |
| } |
| |
| // We only create legacy tags if we are using legacy tags, i.e. if we have seen a legacy tag on our instance |
| if t.usesLegacyTags { |
| tags[TagNameKubernetesClusterLegacy] = t.ClusterID |
| } |
| tags[t.clusterTagKey()] = string(lifecycle) |
| |
| return tags |
| } |
| |
| func (t *awsTagging) clusterID() string { |
| return t.ClusterID |
| } |