| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /* |
| * |
| * Copyright 2021 gRPC authors. |
| * |
| */ |
| |
| package resource |
| |
| import ( |
| "errors" |
| "fmt" |
| "net" |
| "strconv" |
| ) |
| |
| import ( |
| dubbogoLogger "github.com/dubbogo/gost/log/logger" |
| |
| v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" |
| v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" |
| v3aggregateclusterpb "github.com/envoyproxy/go-control-plane/envoy/extensions/clusters/aggregate/v3" |
| v3tlspb "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" |
| |
| "github.com/golang/protobuf/proto" |
| |
| "google.golang.org/protobuf/types/known/anypb" |
| ) |
| |
| import ( |
| "dubbo.apache.org/dubbo-go/v3/xds/client/resource/version" |
| "dubbo.apache.org/dubbo-go/v3/xds/utils/envconfig" |
| "dubbo.apache.org/dubbo-go/v3/xds/utils/matcher" |
| "dubbo.apache.org/dubbo-go/v3/xds/utils/pretty" |
| ) |
| |
| // TransportSocket proto message has a `name` field which is expected to be set |
| // to this value by the management server. |
| const transportSocketName = "envoy.transport_sockets.tls" |
| |
| // UnmarshalCluster processes resources received in an CDS response, validates |
| // them, and transforms them into a native struct which contains only fields we |
| // are interested in. |
| func UnmarshalCluster(opts *UnmarshalOptions) (map[string]ClusterUpdateErrTuple, UpdateMetadata, error) { |
| update := make(map[string]ClusterUpdateErrTuple) |
| md, err := processAllResources(opts, update) |
| return update, md, err |
| } |
| |
| func unmarshalClusterResource(r *anypb.Any, f UpdateValidatorFunc, logger dubbogoLogger.Logger) (string, ClusterUpdate, error) { |
| if !IsClusterResource(r.GetTypeUrl()) { |
| return "", ClusterUpdate{}, fmt.Errorf("unexpected resource type: %q ", r.GetTypeUrl()) |
| } |
| |
| cluster := &v3clusterpb.Cluster{} |
| if err := proto.Unmarshal(r.GetValue(), cluster); err != nil { |
| return "", ClusterUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err) |
| } |
| dubbogoLogger.Debugf("Resource with name: %v, type: %T, contains: %v", cluster.GetName(), cluster, pretty.ToJSON(cluster)) |
| cu, err := validateClusterAndConstructClusterUpdate(cluster) |
| if err != nil { |
| return cluster.GetName(), ClusterUpdate{}, err |
| } |
| cu.Raw = r |
| if f != nil { |
| if err := f(cu); err != nil { |
| return "", ClusterUpdate{}, err |
| } |
| } |
| |
| return cluster.GetName(), cu, nil |
| } |
| |
| const ( |
| defaultRingHashMinSize = 1024 |
| defaultRingHashMaxSize = 8 * 1024 * 1024 // 8M |
| ringHashSizeUpperBound = 8 * 1024 * 1024 // 8M |
| ) |
| |
| func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) { |
| var lbPolicy *ClusterLBPolicyRingHash |
| // todo @(laurence) this direct set |
| cluster.LbPolicy = v3clusterpb.Cluster_ROUND_ROBIN |
| switch cluster.GetLbPolicy() { |
| case v3clusterpb.Cluster_ROUND_ROBIN: |
| lbPolicy = nil // The default is round_robin, and there's no config to set. |
| case v3clusterpb.Cluster_RING_HASH: |
| if !envconfig.XDSRingHash { |
| return ClusterUpdate{}, fmt.Errorf("unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster) |
| } |
| rhc := cluster.GetRingHashLbConfig() |
| if rhc.GetHashFunction() != v3clusterpb.Cluster_RingHashLbConfig_XX_HASH { |
| return ClusterUpdate{}, fmt.Errorf("unsupported ring_hash hash function %v in response: %+v", rhc.GetHashFunction(), cluster) |
| } |
| // Minimum defaults to 1024 entries, and limited to 8M entries Maximum |
| // defaults to 8M entries, and limited to 8M entries |
| var minSize, maxSize uint64 = defaultRingHashMinSize, defaultRingHashMaxSize |
| if min := rhc.GetMinimumRingSize(); min != nil { |
| if min.GetValue() > ringHashSizeUpperBound { |
| return ClusterUpdate{}, fmt.Errorf("unexpected ring_hash mininum ring size %v in response: %+v", min.GetValue(), cluster) |
| } |
| minSize = min.GetValue() |
| } |
| if max := rhc.GetMaximumRingSize(); max != nil { |
| if max.GetValue() > ringHashSizeUpperBound { |
| return ClusterUpdate{}, fmt.Errorf("unexpected ring_hash maxinum ring size %v in response: %+v", max.GetValue(), cluster) |
| } |
| maxSize = max.GetValue() |
| } |
| if minSize > maxSize { |
| return ClusterUpdate{}, fmt.Errorf("ring_hash config min size %v is greater than max %v", minSize, maxSize) |
| } |
| lbPolicy = &ClusterLBPolicyRingHash{MinimumRingSize: minSize, MaximumRingSize: maxSize} |
| default: |
| return ClusterUpdate{}, fmt.Errorf("unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster) |
| } |
| |
| // Process security configuration received from the control plane iff the |
| // corresponding environment variable is set. |
| var sc *SecurityConfig |
| if envconfig.XDSClientSideSecurity { |
| var err error |
| if sc, err = securityConfigFromCluster(cluster); err != nil { |
| return ClusterUpdate{}, err |
| } |
| } |
| |
| ret := ClusterUpdate{ |
| ClusterName: cluster.GetName(), |
| EnableLRS: cluster.GetLrsServer().GetSelf() != nil, |
| SecurityCfg: sc, |
| MaxRequests: circuitBreakersFromCluster(cluster), |
| LBPolicy: lbPolicy, |
| } |
| |
| // Validate and set cluster type from the response. |
| // todo @laurence this set cluster |
| if x, ok := cluster.GetClusterDiscoveryType().(*v3clusterpb.Cluster_Type); ok { |
| x.Type = v3clusterpb.Cluster_EDS |
| } |
| switch { |
| case cluster.GetType() == v3clusterpb.Cluster_EDS: |
| if cluster.GetEdsClusterConfig().GetEdsConfig().GetAds() == nil { |
| return ClusterUpdate{}, fmt.Errorf("unexpected edsConfig in response: %+v", cluster) |
| } |
| ret.ClusterType = ClusterTypeEDS |
| ret.EDSServiceName = cluster.GetEdsClusterConfig().GetServiceName() |
| return ret, nil |
| case cluster.GetType() == v3clusterpb.Cluster_LOGICAL_DNS: |
| if !envconfig.XDSAggregateAndDNS { |
| return ClusterUpdate{}, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster) |
| } |
| ret.ClusterType = ClusterTypeLogicalDNS |
| dnsHN, err := dnsHostNameFromCluster(cluster) |
| if err != nil { |
| return ClusterUpdate{}, err |
| } |
| ret.DNSHostName = dnsHN |
| return ret, nil |
| case cluster.GetClusterType() != nil && cluster.GetClusterType().Name == "envoy.clusters.aggregate": |
| if !envconfig.XDSAggregateAndDNS { |
| return ClusterUpdate{}, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster) |
| } |
| clusters := &v3aggregateclusterpb.ClusterConfig{} |
| if err := proto.Unmarshal(cluster.GetClusterType().GetTypedConfig().GetValue(), clusters); err != nil { |
| return ClusterUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err) |
| } |
| ret.ClusterType = ClusterTypeAggregate |
| ret.PrioritizedClusterNames = clusters.Clusters |
| return ret, nil |
| default: |
| return ClusterUpdate{}, fmt.Errorf("unsupported cluster type (%v, %v) in response: %+v", cluster.GetType(), cluster.GetClusterType(), cluster) |
| } |
| } |
| |
| // dnsHostNameFromCluster extracts the DNS host name from the cluster's load |
| // assignment. |
| // |
| // There should be exactly one locality, with one endpoint, whose address |
| // contains the address and port. |
| func dnsHostNameFromCluster(cluster *v3clusterpb.Cluster) (string, error) { |
| loadAssignment := cluster.GetLoadAssignment() |
| if loadAssignment == nil { |
| return "", fmt.Errorf("load_assignment not present for LOGICAL_DNS cluster") |
| } |
| if len(loadAssignment.GetEndpoints()) != 1 { |
| return "", fmt.Errorf("load_assignment for LOGICAL_DNS cluster must have exactly one locality, got: %+v", loadAssignment) |
| } |
| endpoints := loadAssignment.GetEndpoints()[0].GetLbEndpoints() |
| if len(endpoints) != 1 { |
| return "", fmt.Errorf("locality for LOGICAL_DNS cluster must have exactly one endpoint, got: %+v", endpoints) |
| } |
| endpoint := endpoints[0].GetEndpoint() |
| if endpoint == nil { |
| return "", fmt.Errorf("endpoint for LOGICAL_DNS cluster not set") |
| } |
| socketAddr := endpoint.GetAddress().GetSocketAddress() |
| if socketAddr == nil { |
| return "", fmt.Errorf("socket address for endpoint for LOGICAL_DNS cluster not set") |
| } |
| if socketAddr.GetResolverName() != "" { |
| return "", fmt.Errorf("socket address for endpoint for LOGICAL_DNS cluster not set has unexpected custom resolver name: %v", socketAddr.GetResolverName()) |
| } |
| host := socketAddr.GetAddress() |
| if host == "" { |
| return "", fmt.Errorf("host for endpoint for LOGICAL_DNS cluster not set") |
| } |
| port := socketAddr.GetPortValue() |
| if port == 0 { |
| return "", fmt.Errorf("port for endpoint for LOGICAL_DNS cluster not set") |
| } |
| return net.JoinHostPort(host, strconv.Itoa(int(port))), nil |
| } |
| |
| // securityConfigFromCluster extracts the relevant security configuration from |
| // the received Cluster resource. |
| func securityConfigFromCluster(cluster *v3clusterpb.Cluster) (*SecurityConfig, error) { |
| if tsm := cluster.GetTransportSocketMatches(); len(tsm) != 0 { |
| return nil, fmt.Errorf("unsupport transport_socket_matches field is non-empty: %+v", tsm) |
| } |
| // The Cluster resource contains a `transport_socket` field, which contains |
| // a oneof `typed_config` field of type `protobuf.Any`. The any proto |
| // contains a marshaled representation of an `UpstreamTlsContext` message. |
| ts := cluster.GetTransportSocket() |
| if ts == nil { |
| return nil, nil |
| } |
| if name := ts.GetName(); name != transportSocketName { |
| return nil, fmt.Errorf("transport_socket field has unexpected name: %s", name) |
| } |
| any := ts.GetTypedConfig() |
| if any == nil || any.TypeUrl != version.V3UpstreamTLSContextURL { |
| return nil, fmt.Errorf("transport_socket field has unexpected typeURL: %s", any.TypeUrl) |
| } |
| upstreamCtx := &v3tlspb.UpstreamTlsContext{} |
| if err := proto.Unmarshal(any.GetValue(), upstreamCtx); err != nil { |
| return nil, fmt.Errorf("failed to unmarshal UpstreamTlsContext in CDS response: %v", err) |
| } |
| // The following fields from `UpstreamTlsContext` are ignored: |
| // - sni |
| // - allow_renegotiation |
| // - max_session_keys |
| if upstreamCtx.GetCommonTlsContext() == nil { |
| return nil, errors.New("UpstreamTlsContext in CDS response does not contain a CommonTlsContext") |
| } |
| |
| return securityConfigFromCommonTLSContext(upstreamCtx.GetCommonTlsContext(), false) |
| } |
| |
| // common is expected to be not nil. |
| // The `alpn_protocols` field is ignored. |
| func securityConfigFromCommonTLSContext(common *v3tlspb.CommonTlsContext, server bool) (*SecurityConfig, error) { |
| if common.GetTlsParams() != nil { |
| return nil, fmt.Errorf("unsupported tls_params field in CommonTlsContext message: %+v", common) |
| } |
| if common.GetCustomHandshaker() != nil { |
| return nil, fmt.Errorf("unsupported custom_handshaker field in CommonTlsContext message: %+v", common) |
| } |
| |
| // For now, if we can't get a valid security config from the new fields, we |
| // fallback to the old deprecated fields. |
| // TODO: Drop support for deprecated fields. NACK if err != nil here. |
| sc, _ := securityConfigFromCommonTLSContextUsingNewFields(common, server) |
| if sc == nil || sc.Equal(&SecurityConfig{}) { |
| var err error |
| sc, err = securityConfigFromCommonTLSContextWithDeprecatedFields(common, server) |
| if err != nil { |
| return nil, err |
| } |
| } |
| if sc != nil { |
| // sc == nil is a valid case where the control plane has not sent us any |
| // security configuration. xDS creds will use fallback creds. |
| if server { |
| if sc.IdentityInstanceName == "" { |
| return nil, errors.New("security configuration on the server-side does not contain identity certificate provider instance name") |
| } |
| } else { |
| if sc.RootInstanceName == "" { |
| return nil, errors.New("security configuration on the client-side does not contain root certificate provider instance name") |
| } |
| } |
| } |
| return sc, nil |
| } |
| |
| func securityConfigFromCommonTLSContextWithDeprecatedFields(common *v3tlspb.CommonTlsContext, server bool) (*SecurityConfig, error) { |
| // The `CommonTlsContext` contains a |
| // `tls_certificate_certificate_provider_instance` field of type |
| // `CertificateProviderInstance`, which contains the provider instance name |
| // and the certificate name to fetch identity certs. |
| sc := &SecurityConfig{} |
| if identity := common.GetTlsCertificateCertificateProviderInstance(); identity != nil { |
| sc.IdentityInstanceName = identity.GetInstanceName() |
| sc.IdentityCertName = identity.GetCertificateName() |
| } |
| |
| // The `CommonTlsContext` contains a `validation_context_type` field which |
| // is a oneof. We can get the values that we are interested in from two of |
| // those possible values: |
| // - combined validation context: |
| // - contains a default validation context which holds the list of |
| // matchers for accepted SANs. |
| // - contains certificate provider instance configuration |
| // - certificate provider instance configuration |
| // - in this case, we do not get a list of accepted SANs. |
| switch t := common.GetValidationContextType().(type) { |
| case *v3tlspb.CommonTlsContext_CombinedValidationContext: |
| combined := common.GetCombinedValidationContext() |
| var matchers []matcher.StringMatcher |
| if def := combined.GetDefaultValidationContext(); def != nil { |
| for _, m := range def.GetMatchSubjectAltNames() { |
| matcher, err := matcher.StringMatcherFromProto(m) |
| if err != nil { |
| return nil, err |
| } |
| matchers = append(matchers, matcher) |
| } |
| } |
| if server && len(matchers) != 0 { |
| return nil, fmt.Errorf("match_subject_alt_names field in validation context is not supported on the server: %v", common) |
| } |
| sc.SubjectAltNameMatchers = matchers |
| if pi := combined.GetValidationContextCertificateProviderInstance(); pi != nil { |
| sc.RootInstanceName = pi.GetInstanceName() |
| sc.RootCertName = pi.GetCertificateName() |
| } |
| case *v3tlspb.CommonTlsContext_ValidationContextCertificateProviderInstance: |
| pi := common.GetValidationContextCertificateProviderInstance() |
| sc.RootInstanceName = pi.GetInstanceName() |
| sc.RootCertName = pi.GetCertificateName() |
| case nil: |
| // It is valid for the validation context to be nil on the server side. |
| default: |
| return nil, fmt.Errorf("validation context contains unexpected type: %T", t) |
| } |
| return sc, nil |
| } |
| |
| // gRFC A29 https://github.com/grpc/proposal/blob/master/A29-xds-tls-security.md |
| // specifies the new way to fetch security configuration and says the following: |
| // |
| // Although there are various ways to obtain certificates as per this proto |
| // (which are supported by Envoy), gRPC supports only one of them and that is |
| // the `CertificateProviderPluginInstance` proto. |
| // |
| // This helper function attempts to fetch security configuration from the |
| // `CertificateProviderPluginInstance` message, given a CommonTlsContext. |
| func securityConfigFromCommonTLSContextUsingNewFields(common *v3tlspb.CommonTlsContext, server bool) (*SecurityConfig, error) { |
| // The `tls_certificate_provider_instance` field of type |
| // `CertificateProviderPluginInstance` is used to fetch the identity |
| // certificate provider. |
| sc := &SecurityConfig{} |
| identity := common.GetTlsCertificateProviderInstance() |
| if identity == nil && len(common.GetTlsCertificates()) != 0 { |
| return nil, fmt.Errorf("expected field tls_certificate_provider_instance is not set, while unsupported field tls_certificates is set in CommonTlsContext message: %+v", common) |
| } |
| if identity == nil && common.GetTlsCertificateSdsSecretConfigs() != nil { |
| return nil, fmt.Errorf("expected field tls_certificate_provider_instance is not set, while unsupported field tls_certificate_sds_secret_configs is set in CommonTlsContext message: %+v", common) |
| } |
| sc.IdentityInstanceName = identity.GetInstanceName() |
| sc.IdentityCertName = identity.GetCertificateName() |
| |
| // The `CommonTlsContext` contains a oneof field `validation_context_type`, |
| // which contains the `CertificateValidationContext` message in one of the |
| // following ways: |
| // - `validation_context` field |
| // - this is directly of type `CertificateValidationContext` |
| // - `combined_validation_context` field |
| // - this is of type `CombinedCertificateValidationContext` and contains |
| // a `default validation context` field of type |
| // `CertificateValidationContext` |
| // |
| // The `CertificateValidationContext` message has the following fields that |
| // we are interested in: |
| // - `ca_certificate_provider_instance` |
| // - this is of type `CertificateProviderPluginInstance` |
| // - `match_subject_alt_names` |
| // - this is a list of string matchers |
| // |
| // The `CertificateProviderPluginInstance` message contains two fields |
| // - instance_name |
| // - this is the certificate provider instance name to be looked up in |
| // the bootstrap configuration |
| // - certificate_name |
| // - this is an opaque name passed to the certificate provider |
| var validationCtx *v3tlspb.CertificateValidationContext |
| switch typ := common.GetValidationContextType().(type) { |
| case *v3tlspb.CommonTlsContext_ValidationContext: |
| validationCtx = common.GetValidationContext() |
| case *v3tlspb.CommonTlsContext_CombinedValidationContext: |
| validationCtx = common.GetCombinedValidationContext().GetDefaultValidationContext() |
| case nil: |
| // It is valid for the validation context to be nil on the server side. |
| return sc, nil |
| default: |
| return nil, fmt.Errorf("validation context contains unexpected type: %T", typ) |
| } |
| // If we get here, it means that the `CertificateValidationContext` message |
| // was found through one of the supported ways. It is an error if the |
| // validation context is specified, but it does not contain the |
| // ca_certificate_provider_instance field which contains information about |
| // the certificate provider to be used for the root certificates. |
| if validationCtx.GetCaCertificateProviderInstance() == nil { |
| return nil, fmt.Errorf("expected field ca_certificate_provider_instance is missing in CommonTlsContext message: %+v", common) |
| } |
| // The following fields are ignored: |
| // - trusted_ca |
| // - watched_directory |
| // - allow_expired_certificate |
| // - trust_chain_verification |
| switch { |
| case len(validationCtx.GetVerifyCertificateSpki()) != 0: |
| return nil, fmt.Errorf("unsupported verify_certificate_spki field in CommonTlsContext message: %+v", common) |
| case len(validationCtx.GetVerifyCertificateHash()) != 0: |
| return nil, fmt.Errorf("unsupported verify_certificate_hash field in CommonTlsContext message: %+v", common) |
| case validationCtx.GetRequireSignedCertificateTimestamp().GetValue(): |
| return nil, fmt.Errorf("unsupported require_sugned_ceritificate_timestamp field in CommonTlsContext message: %+v", common) |
| case validationCtx.GetCrl() != nil: |
| return nil, fmt.Errorf("unsupported crl field in CommonTlsContext message: %+v", common) |
| case validationCtx.GetCustomValidatorConfig() != nil: |
| return nil, fmt.Errorf("unsupported custom_validator_config field in CommonTlsContext message: %+v", common) |
| } |
| |
| if rootProvider := validationCtx.GetCaCertificateProviderInstance(); rootProvider != nil { |
| sc.RootInstanceName = rootProvider.GetInstanceName() |
| sc.RootCertName = rootProvider.GetCertificateName() |
| } |
| var matchers []matcher.StringMatcher |
| for _, m := range validationCtx.GetMatchSubjectAltNames() { |
| matcher, err := matcher.StringMatcherFromProto(m) |
| if err != nil { |
| return nil, err |
| } |
| matchers = append(matchers, matcher) |
| } |
| if server && len(matchers) != 0 { |
| return nil, fmt.Errorf("match_subject_alt_names field in validation context is not supported on the server: %v", common) |
| } |
| sc.SubjectAltNameMatchers = matchers |
| return sc, nil |
| } |
| |
| // circuitBreakersFromCluster extracts the circuit breakers configuration from |
| // the received cluster resource. Returns nil if no CircuitBreakers or no |
| // Thresholds in CircuitBreakers. |
| func circuitBreakersFromCluster(cluster *v3clusterpb.Cluster) *uint32 { |
| for _, threshold := range cluster.GetCircuitBreakers().GetThresholds() { |
| if threshold.GetPriority() != v3corepb.RoutingPriority_DEFAULT { |
| continue |
| } |
| maxRequestsPb := threshold.GetMaxRequests() |
| if maxRequestsPb == nil { |
| return nil |
| } |
| maxRequests := maxRequestsPb.GetValue() |
| return &maxRequests |
| } |
| return nil |
| } |