blob: b2efbac815029e96fe3c578c2b104f469ceb3eef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
*
* Copyright 2021 gRPC authors.
*
*/
package resource
import (
"errors"
"fmt"
"strconv"
)
import (
v1udpatypepb "github.com/cncf/udpa/go/udpa/type/v1"
v3cncftypepb "github.com/cncf/xds/go/xds/type/v3"
dubbogoLogger "github.com/dubbogo/gost/log/logger"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"google.golang.org/protobuf/types/known/anypb"
)
import (
"dubbo.apache.org/dubbo-go/v3/xds/client/resource/version"
"dubbo.apache.org/dubbo-go/v3/xds/httpfilter"
"dubbo.apache.org/dubbo-go/v3/xds/utils/pretty"
)
// UnmarshalListener processes resources received in an LDS response, validates
// them, and transforms them into a native struct which contains only fields we
// are interested in.
func UnmarshalListener(opts *UnmarshalOptions) (map[string]ListenerUpdateErrTuple, UpdateMetadata, error) {
update := make(map[string]ListenerUpdateErrTuple)
md, err := processAllResources(opts, update)
return update, md, err
}
func unmarshalListenerResource(r *anypb.Any, f UpdateValidatorFunc, logger dubbogoLogger.Logger) (string, ListenerUpdate, error) {
if !IsListenerResource(r.GetTypeUrl()) {
return "", ListenerUpdate{}, fmt.Errorf("unexpected resource type: %q ", r.GetTypeUrl())
}
// TODO: Pass version.TransportAPI instead of relying upon the type URL
v2 := r.GetTypeUrl() == version.V2ListenerURL
lis := &v3listenerpb.Listener{}
if err := proto.Unmarshal(r.GetValue(), lis); err != nil {
return "", ListenerUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
}
dubbogoLogger.Debugf("Resource with name: %v, type: %T, contains: %v", lis.GetName(), lis, pretty.ToJSON(lis))
lu, err := processListener(lis, logger, v2)
if err != nil {
return lis.GetName(), ListenerUpdate{}, err
}
if f != nil {
if err := f(*lu); err != nil {
return lis.GetName(), ListenerUpdate{}, err
}
}
lu.Raw = r
return lis.GetName(), *lu, nil
}
func processListener(lis *v3listenerpb.Listener, logger dubbogoLogger.Logger, v2 bool) (*ListenerUpdate, error) {
if lis.GetApiListener() != nil {
return processClientSideListener(lis, logger, v2)
}
return processServerSideListener(lis, logger)
}
// processClientSideListener checks if the provided Listener proto meets
// the expected criteria. If so, it returns a non-empty routeConfigName.
func processClientSideListener(lis *v3listenerpb.Listener, logger dubbogoLogger.Logger, v2 bool) (*ListenerUpdate, error) {
update := &ListenerUpdate{}
apiLisAny := lis.GetApiListener().GetApiListener()
if !IsHTTPConnManagerResource(apiLisAny.GetTypeUrl()) {
return nil, fmt.Errorf("unexpected resource type: %q", apiLisAny.GetTypeUrl())
}
apiLis := &v3httppb.HttpConnectionManager{}
if err := proto.Unmarshal(apiLisAny.GetValue(), apiLis); err != nil {
return nil, fmt.Errorf("failed to unmarshal api_listner: %v", err)
}
// "HttpConnectionManager.xff_num_trusted_hops must be unset or zero and
// HttpConnectionManager.original_ip_detection_extensions must be empty. If
// either field has an incorrect value, the Listener must be NACKed." - A41
if apiLis.XffNumTrustedHops != 0 {
return nil, fmt.Errorf("xff_num_trusted_hops must be unset or zero %+v", apiLis)
}
if len(apiLis.OriginalIpDetectionExtensions) != 0 {
return nil, fmt.Errorf("original_ip_detection_extensions must be empty %+v", apiLis)
}
switch apiLis.RouteSpecifier.(type) {
case *v3httppb.HttpConnectionManager_Rds:
if apiLis.GetRds().GetConfigSource().GetAds() == nil {
return nil, fmt.Errorf("ConfigSource is not ADS: %+v", lis)
}
name := apiLis.GetRds().GetRouteConfigName()
if name == "" {
return nil, fmt.Errorf("empty route_config_name: %+v", lis)
}
update.RouteConfigName = name
case *v3httppb.HttpConnectionManager_RouteConfig:
routeU, err := generateRDSUpdateFromRouteConfiguration(apiLis.GetRouteConfig(), logger, v2)
if err != nil {
return nil, fmt.Errorf("failed to parse inline RDS resp: %v", err)
}
update.InlineRouteConfig = &routeU
case nil:
return nil, fmt.Errorf("no RouteSpecifier: %+v", apiLis)
default:
return nil, fmt.Errorf("unsupported type %T for RouteSpecifier", apiLis.RouteSpecifier)
}
if v2 {
return update, nil
}
// The following checks and fields only apply to xDS protocol versions v3+.
update.MaxStreamDuration = apiLis.GetCommonHttpProtocolOptions().GetMaxStreamDuration().AsDuration()
var err error
if update.HTTPFilters, err = processHTTPFilters(apiLis.GetHttpFilters(), false); err != nil {
return nil, err
}
return update, nil
}
func unwrapHTTPFilterConfig(config *anypb.Any) (proto.Message, string, error) {
switch {
case ptypes.Is(config, &v3cncftypepb.TypedStruct{}):
// The real type name is inside the new TypedStruct message.
s := new(v3cncftypepb.TypedStruct)
if err := ptypes.UnmarshalAny(config, s); err != nil {
return nil, "", fmt.Errorf("error unmarshaling TypedStruct filter config: %v", err)
}
return s, s.GetTypeUrl(), nil
case ptypes.Is(config, &v1udpatypepb.TypedStruct{}):
// The real type name is inside the old TypedStruct message.
s := new(v1udpatypepb.TypedStruct)
if err := ptypes.UnmarshalAny(config, s); err != nil {
return nil, "", fmt.Errorf("error unmarshaling TypedStruct filter config: %v", err)
}
return s, s.GetTypeUrl(), nil
default:
return config, config.GetTypeUrl(), nil
}
}
func validateHTTPFilterConfig(cfg *anypb.Any, lds, optional bool) (httpfilter.Filter, httpfilter.FilterConfig, error) {
config, typeURL, err := unwrapHTTPFilterConfig(cfg)
if err != nil {
return nil, nil, err
}
filterBuilder := httpfilter.Get(typeURL)
if filterBuilder == nil {
if optional {
return nil, nil, nil
}
return nil, nil, fmt.Errorf("no filter implementation found for %q", typeURL)
}
parseFunc := filterBuilder.ParseFilterConfig
if !lds {
parseFunc = filterBuilder.ParseFilterConfigOverride
}
filterConfig, err := parseFunc(config)
if err != nil {
return nil, nil, fmt.Errorf("error parsing config for filter %q: %v", typeURL, err)
}
return filterBuilder, filterConfig, nil
}
func processHTTPFilterOverrides(cfgs map[string]*anypb.Any) (map[string]httpfilter.FilterConfig, error) {
if len(cfgs) == 0 {
return nil, nil
}
m := make(map[string]httpfilter.FilterConfig)
for name, cfg := range cfgs {
optional := false
s := new(v3routepb.FilterConfig)
if ptypes.Is(cfg, s) {
if err := ptypes.UnmarshalAny(cfg, s); err != nil {
return nil, fmt.Errorf("filter override %q: error unmarshaling FilterConfig: %v", name, err)
}
cfg = s.GetConfig()
optional = s.GetIsOptional()
}
httpFilter, config, err := validateHTTPFilterConfig(cfg, false, optional)
if err != nil {
return nil, fmt.Errorf("filter override %q: %v", name, err)
}
if httpFilter == nil {
// Optional configs are ignored.
continue
}
m[name] = config
}
return m, nil
}
func processHTTPFilters(filters []*v3httppb.HttpFilter, server bool) ([]HTTPFilter, error) {
ret := make([]HTTPFilter, 0, len(filters))
seenNames := make(map[string]bool, len(filters))
for _, filter := range filters {
name := filter.GetName()
if name == "" {
return nil, errors.New("filter missing name field")
}
if seenNames[name] {
return nil, fmt.Errorf("duplicate filter name %q", name)
}
seenNames[name] = true
httpFilter, config, err := validateHTTPFilterConfig(filter.GetTypedConfig(), true, filter.GetIsOptional())
if err != nil {
return nil, err
}
if httpFilter == nil {
// Optional configs are ignored.
continue
}
if server {
if _, ok := httpFilter.(httpfilter.ServerInterceptorBuilder); !ok {
if filter.GetIsOptional() {
continue
}
return nil, fmt.Errorf("HTTP filter %q not supported server-side", name)
}
} else if _, ok := httpFilter.(httpfilter.ClientInterceptorBuilder); !ok {
if filter.GetIsOptional() {
continue
}
return nil, fmt.Errorf("HTTP filter %q not supported client-side", name)
}
// Save name/config
ret = append(ret, HTTPFilter{Name: name, Filter: httpFilter, Config: config})
}
// "Validation will fail if a terminal filter is not the last filter in the
// chain or if a non-terminal filter is the last filter in the chain." - A39
if len(ret) == 0 {
return nil, fmt.Errorf("http filters list is empty")
}
var i int
for ; i < len(ret)-1; i++ {
if ret[i].Filter.IsTerminal() {
return nil, fmt.Errorf("http filter %q is a terminal filter but it is not last in the filter chain", ret[i].Name)
}
}
if !ret[i].Filter.IsTerminal() {
return nil, fmt.Errorf("http filter %q is not a terminal filter", ret[len(ret)-1].Name)
}
return ret, nil
}
func processServerSideListener(lis *v3listenerpb.Listener, logger dubbogoLogger.Logger) (*ListenerUpdate, error) {
if n := len(lis.ListenerFilters); n != 0 {
return nil, fmt.Errorf("unsupported field 'listener_filters' contains %d entries", n)
}
if useOrigDst := lis.GetUseOriginalDst(); useOrigDst != nil && useOrigDst.GetValue() {
return nil, errors.New("unsupported field 'use_original_dst' is present and set to true")
}
addr := lis.GetAddress()
if addr == nil {
return nil, fmt.Errorf("no address field in LDS response: %+v", lis)
}
sockAddr := addr.GetSocketAddress()
if sockAddr == nil {
return nil, fmt.Errorf("no socket_address field in LDS response: %+v", lis)
}
lu := &ListenerUpdate{
InboundListenerCfg: &InboundListenerConfig{
Address: sockAddr.GetAddress(),
Port: strconv.Itoa(int(sockAddr.GetPortValue())),
},
}
//fcMgr, err := NewFilterChainManager(lis, logger)
//if err != nil {
// return nil, err
//}
//lu.InboundListenerCfg.FilterChains = fcMgr
return lu, nil
}