| /* |
| Copyright 2017 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package aws |
| |
| import ( |
| "fmt" |
| "net/url" |
| "regexp" |
| "strings" |
| "sync" |
| "time" |
| |
| "github.com/aws/aws-sdk-go/aws" |
| "github.com/aws/aws-sdk-go/service/ec2" |
| "k8s.io/klog" |
| |
| "k8s.io/api/core/v1" |
| ) |
| |
| // awsInstanceRegMatch represents Regex Match for AWS instance. |
| var awsInstanceRegMatch = regexp.MustCompile("^i-[^/]*$") |
| |
| // InstanceID represents the ID of the instance in the AWS API, e.g. i-12345678 |
| // The "traditional" format is "i-12345678" |
| // A new longer format is also being introduced: "i-12345678abcdef01" |
| // We should not assume anything about the length or format, though it seems |
| // reasonable to assume that instances will continue to start with "i-". |
| type InstanceID string |
| |
| func (i InstanceID) awsString() *string { |
| return aws.String(string(i)) |
| } |
| |
| // KubernetesInstanceID represents the id for an instance in the kubernetes API; |
| // the following form |
| // * aws:///<zone>/<awsInstanceId> |
| // * aws:////<awsInstanceId> |
| // * <awsInstanceId> |
| type KubernetesInstanceID string |
| |
| // MapToAWSInstanceID extracts the InstanceID from the KubernetesInstanceID |
| func (name KubernetesInstanceID) MapToAWSInstanceID() (InstanceID, error) { |
| s := string(name) |
| |
| if !strings.HasPrefix(s, "aws://") { |
| // Assume a bare aws volume id (vol-1234...) |
| // Build a URL with an empty host (AZ) |
| s = "aws://" + "/" + "/" + s |
| } |
| url, err := url.Parse(s) |
| if err != nil { |
| return "", fmt.Errorf("Invalid instance name (%s): %v", name, err) |
| } |
| if url.Scheme != "aws" { |
| return "", fmt.Errorf("Invalid scheme for AWS instance (%s)", name) |
| } |
| |
| awsID := "" |
| tokens := strings.Split(strings.Trim(url.Path, "/"), "/") |
| if len(tokens) == 1 { |
| // instanceId |
| awsID = tokens[0] |
| } else if len(tokens) == 2 { |
| // az/instanceId |
| awsID = tokens[1] |
| } |
| |
| // We sanity check the resulting volume; the two known formats are |
| // i-12345678 and i-12345678abcdef01 |
| if awsID == "" || !awsInstanceRegMatch.MatchString(awsID) { |
| return "", fmt.Errorf("Invalid format for AWS instance (%s)", name) |
| } |
| |
| return InstanceID(awsID), nil |
| } |
| |
| // mapToAWSInstanceID extracts the InstanceIDs from the Nodes, returning an error if a Node cannot be mapped |
| func mapToAWSInstanceIDs(nodes []*v1.Node) ([]InstanceID, error) { |
| var instanceIDs []InstanceID |
| for _, node := range nodes { |
| if node.Spec.ProviderID == "" { |
| return nil, fmt.Errorf("node %q did not have ProviderID set", node.Name) |
| } |
| instanceID, err := KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID() |
| if err != nil { |
| return nil, fmt.Errorf("unable to parse ProviderID %q for node %q", node.Spec.ProviderID, node.Name) |
| } |
| instanceIDs = append(instanceIDs, instanceID) |
| } |
| |
| return instanceIDs, nil |
| } |
| |
| // mapToAWSInstanceIDsTolerant extracts the InstanceIDs from the Nodes, skipping Nodes that cannot be mapped |
| func mapToAWSInstanceIDsTolerant(nodes []*v1.Node) []InstanceID { |
| var instanceIDs []InstanceID |
| for _, node := range nodes { |
| if node.Spec.ProviderID == "" { |
| klog.Warningf("node %q did not have ProviderID set", node.Name) |
| continue |
| } |
| instanceID, err := KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID() |
| if err != nil { |
| klog.Warningf("unable to parse ProviderID %q for node %q", node.Spec.ProviderID, node.Name) |
| continue |
| } |
| instanceIDs = append(instanceIDs, instanceID) |
| } |
| |
| return instanceIDs |
| } |
| |
| // Gets the full information about this instance from the EC2 API |
| func describeInstance(ec2Client EC2, instanceID InstanceID) (*ec2.Instance, error) { |
| request := &ec2.DescribeInstancesInput{ |
| InstanceIds: []*string{instanceID.awsString()}, |
| } |
| |
| instances, err := ec2Client.DescribeInstances(request) |
| if err != nil { |
| return nil, err |
| } |
| if len(instances) == 0 { |
| return nil, fmt.Errorf("no instances found for instance: %s", instanceID) |
| } |
| if len(instances) > 1 { |
| return nil, fmt.Errorf("multiple instances found for instance: %s", instanceID) |
| } |
| return instances[0], nil |
| } |
| |
| // instanceCache manages the cache of DescribeInstances |
| type instanceCache struct { |
| // TODO: Get rid of this field, send all calls through the instanceCache |
| cloud *Cloud |
| |
| mutex sync.Mutex |
| snapshot *allInstancesSnapshot |
| } |
| |
| // Gets the full information about these instance from the EC2 API |
| func (c *instanceCache) describeAllInstancesUncached() (*allInstancesSnapshot, error) { |
| now := time.Now() |
| |
| klog.V(4).Infof("EC2 DescribeInstances - fetching all instances") |
| |
| filters := []*ec2.Filter{} |
| instances, err := c.cloud.describeInstances(filters) |
| if err != nil { |
| return nil, err |
| } |
| |
| m := make(map[InstanceID]*ec2.Instance) |
| for _, i := range instances { |
| id := InstanceID(aws.StringValue(i.InstanceId)) |
| m[id] = i |
| } |
| |
| snapshot := &allInstancesSnapshot{now, m} |
| |
| c.mutex.Lock() |
| defer c.mutex.Unlock() |
| |
| if c.snapshot != nil && snapshot.olderThan(c.snapshot) { |
| // If this happens a lot, we could run this function in a mutex and only return one result |
| klog.Infof("Not caching concurrent AWS DescribeInstances results") |
| } else { |
| c.snapshot = snapshot |
| } |
| |
| return snapshot, nil |
| } |
| |
| // cacheCriteria holds criteria that must hold to use a cached snapshot |
| type cacheCriteria struct { |
| // MaxAge indicates the maximum age of a cached snapshot we can accept. |
| // If set to 0 (i.e. unset), cached values will not time out because of age. |
| MaxAge time.Duration |
| |
| // HasInstances is a list of InstanceIDs that must be in a cached snapshot for it to be considered valid. |
| // If an instance is not found in the cached snapshot, the snapshot be ignored and we will re-fetch. |
| HasInstances []InstanceID |
| } |
| |
| // describeAllInstancesCached returns all instances, using cached results if applicable |
| func (c *instanceCache) describeAllInstancesCached(criteria cacheCriteria) (*allInstancesSnapshot, error) { |
| var err error |
| snapshot := c.getSnapshot() |
| if snapshot != nil && !snapshot.MeetsCriteria(criteria) { |
| snapshot = nil |
| } |
| |
| if snapshot == nil { |
| snapshot, err = c.describeAllInstancesUncached() |
| if err != nil { |
| return nil, err |
| } |
| } else { |
| klog.V(6).Infof("EC2 DescribeInstances - using cached results") |
| } |
| |
| return snapshot, nil |
| } |
| |
| // getSnapshot returns a snapshot if one exists |
| func (c *instanceCache) getSnapshot() *allInstancesSnapshot { |
| c.mutex.Lock() |
| defer c.mutex.Unlock() |
| |
| return c.snapshot |
| } |
| |
| // olderThan is a simple helper to encapsulate timestamp comparison |
| func (s *allInstancesSnapshot) olderThan(other *allInstancesSnapshot) bool { |
| // After() is technically broken by time changes until we have monotonic time |
| return other.timestamp.After(s.timestamp) |
| } |
| |
| // MeetsCriteria returns true if the snapshot meets the criteria in cacheCriteria |
| func (s *allInstancesSnapshot) MeetsCriteria(criteria cacheCriteria) bool { |
| if criteria.MaxAge > 0 { |
| // Sub() is technically broken by time changes until we have monotonic time |
| now := time.Now() |
| if now.Sub(s.timestamp) > criteria.MaxAge { |
| klog.V(6).Infof("instanceCache snapshot cannot be used as is older than MaxAge=%s", criteria.MaxAge) |
| return false |
| } |
| } |
| |
| if len(criteria.HasInstances) != 0 { |
| for _, id := range criteria.HasInstances { |
| if nil == s.instances[id] { |
| klog.V(6).Infof("instanceCache snapshot cannot be used as does not contain instance %s", id) |
| return false |
| } |
| } |
| } |
| |
| return true |
| } |
| |
| // allInstancesSnapshot holds the results from querying for all instances, |
| // along with the timestamp for cache-invalidation purposes |
| type allInstancesSnapshot struct { |
| timestamp time.Time |
| instances map[InstanceID]*ec2.Instance |
| } |
| |
| // FindInstances returns the instances corresponding to the specified ids. If an id is not found, it is ignored. |
| func (s *allInstancesSnapshot) FindInstances(ids []InstanceID) map[InstanceID]*ec2.Instance { |
| m := make(map[InstanceID]*ec2.Instance) |
| for _, id := range ids { |
| instance := s.instances[id] |
| if instance != nil { |
| m[id] = instance |
| } |
| } |
| return m |
| } |