| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /* |
| * |
| * Copyright 2021 gRPC authors. |
| * |
| */ |
| |
| // Package fault implements the Envoy Fault Injection HTTP filter. |
| package fault |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "io" |
| "strconv" |
| "sync/atomic" |
| "time" |
| ) |
| |
| import ( |
| cpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/common/fault/v3" |
| fpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/fault/v3" |
| tpb "github.com/envoyproxy/go-control-plane/envoy/type/v3" |
| |
| "github.com/golang/protobuf/proto" |
| "github.com/golang/protobuf/ptypes" |
| |
| "google.golang.org/grpc/codes" |
| |
| "google.golang.org/grpc/metadata" |
| |
| "google.golang.org/grpc/status" |
| |
| "google.golang.org/protobuf/types/known/anypb" |
| ) |
| |
| import ( |
| "dubbo.apache.org/dubbo-go/v3/xds/httpfilter" |
| "dubbo.apache.org/dubbo-go/v3/xds/utils/grpcrand" |
| iresolver "dubbo.apache.org/dubbo-go/v3/xds/utils/resolver" |
| ) |
| |
| const headerAbortHTTPStatus = "x-envoy-fault-abort-request" |
| const headerAbortGRPCStatus = "x-envoy-fault-abort-grpc-request" |
| const headerAbortPercentage = "x-envoy-fault-abort-request-percentage" |
| |
| const headerDelayPercentage = "x-envoy-fault-delay-request-percentage" |
| const headerDelayDuration = "x-envoy-fault-delay-request" |
| |
| var statusMap = map[int]codes.Code{ |
| 400: codes.Internal, |
| 401: codes.Unauthenticated, |
| 403: codes.PermissionDenied, |
| 404: codes.Unimplemented, |
| 429: codes.Unavailable, |
| 502: codes.Unavailable, |
| 503: codes.Unavailable, |
| 504: codes.Unavailable, |
| } |
| |
| func init() { |
| httpfilter.Register(builder{}) |
| } |
| |
| type builder struct { |
| } |
| |
| type config struct { |
| httpfilter.FilterConfig |
| config *fpb.HTTPFault |
| } |
| |
| func (builder) TypeURLs() []string { |
| return []string{"type.googleapis.com/envoy.extensions.filters.http.fault.v3.HTTPFault"} |
| } |
| |
| // Parsing is the same for the base config and the override config. |
| func parseConfig(cfg proto.Message) (httpfilter.FilterConfig, error) { |
| if cfg == nil { |
| return nil, fmt.Errorf("fault: nil configuration message provided") |
| } |
| any, ok := cfg.(*anypb.Any) |
| if !ok { |
| return nil, fmt.Errorf("fault: error parsing config %v: unknown type %T", cfg, cfg) |
| } |
| msg := new(fpb.HTTPFault) |
| if err := ptypes.UnmarshalAny(any, msg); err != nil { |
| return nil, fmt.Errorf("fault: error parsing config %v: %v", cfg, err) |
| } |
| return config{config: msg}, nil |
| } |
| |
| func (builder) ParseFilterConfig(cfg proto.Message) (httpfilter.FilterConfig, error) { |
| return parseConfig(cfg) |
| } |
| |
| func (builder) ParseFilterConfigOverride(override proto.Message) (httpfilter.FilterConfig, error) { |
| return parseConfig(override) |
| } |
| |
| func (builder) IsTerminal() bool { |
| return false |
| } |
| |
| var _ httpfilter.ClientInterceptorBuilder = builder{} |
| |
| func (builder) BuildClientInterceptor(cfg, override httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) { |
| if cfg == nil { |
| return nil, fmt.Errorf("fault: nil config provided") |
| } |
| |
| c, ok := cfg.(config) |
| if !ok { |
| return nil, fmt.Errorf("fault: incorrect config type provided (%T): %v", cfg, cfg) |
| } |
| |
| if override != nil { |
| // override completely replaces the listener configuration; but we |
| // still validate the listener config type. |
| c, ok = override.(config) |
| if !ok { |
| return nil, fmt.Errorf("fault: incorrect override config type provided (%T): %v", override, override) |
| } |
| } |
| |
| icfg := c.config |
| if (icfg.GetMaxActiveFaults() != nil && icfg.GetMaxActiveFaults().GetValue() == 0) || |
| (icfg.GetDelay() == nil && icfg.GetAbort() == nil) { |
| return nil, nil |
| } |
| return &interceptor{config: icfg}, nil |
| } |
| |
| type interceptor struct { |
| config *fpb.HTTPFault |
| } |
| |
| var activeFaults uint32 // global active faults; accessed atomically |
| |
| func (i *interceptor) NewStream(ctx context.Context, ri iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) { |
| if maxAF := i.config.GetMaxActiveFaults(); maxAF != nil { |
| defer atomic.AddUint32(&activeFaults, ^uint32(0)) // decrement counter |
| if af := atomic.AddUint32(&activeFaults, 1); af > maxAF.GetValue() { |
| // Would exceed maximum active fault limit. |
| return newStream(ctx, done) |
| } |
| } |
| |
| if err := injectDelay(ctx, i.config.GetDelay()); err != nil { |
| return nil, err |
| } |
| |
| if err := injectAbort(ctx, i.config.GetAbort()); err != nil { |
| if err == errOKStream { |
| return &okStream{ctx: ctx}, nil |
| } |
| return nil, err |
| } |
| return newStream(ctx, done) |
| } |
| |
| // For overriding in tests |
| var randIntn = grpcrand.Intn |
| var newTimer = time.NewTimer |
| |
| func injectDelay(ctx context.Context, delayCfg *cpb.FaultDelay) error { |
| numerator, denominator := splitPct(delayCfg.GetPercentage()) |
| var delay time.Duration |
| switch delayType := delayCfg.GetFaultDelaySecifier().(type) { |
| case *cpb.FaultDelay_FixedDelay: |
| delay = delayType.FixedDelay.AsDuration() |
| case *cpb.FaultDelay_HeaderDelay_: |
| md, _ := metadata.FromOutgoingContext(ctx) |
| v := md[headerDelayDuration] |
| if v == nil { |
| // No delay configured for this RPC. |
| return nil |
| } |
| ms, ok := parseIntFromMD(v) |
| if !ok { |
| // Malformed header; no delay. |
| return nil |
| } |
| delay = time.Duration(ms) * time.Millisecond |
| if v := md[headerDelayPercentage]; v != nil { |
| if num, ok := parseIntFromMD(v); ok && num < numerator { |
| numerator = num |
| } |
| } |
| } |
| if delay == 0 || randIntn(denominator) >= numerator { |
| return nil |
| } |
| t := newTimer(delay) |
| select { |
| case <-t.C: |
| case <-ctx.Done(): |
| t.Stop() |
| return ctx.Err() |
| } |
| return nil |
| } |
| |
| func injectAbort(ctx context.Context, abortCfg *fpb.FaultAbort) error { |
| numerator, denominator := splitPct(abortCfg.GetPercentage()) |
| code := codes.OK |
| okCode := false |
| switch errType := abortCfg.GetErrorType().(type) { |
| case *fpb.FaultAbort_HttpStatus: |
| code, okCode = grpcFromHTTP(int(errType.HttpStatus)) |
| case *fpb.FaultAbort_GrpcStatus: |
| code, okCode = sanitizeGRPCCode(codes.Code(errType.GrpcStatus)), true |
| case *fpb.FaultAbort_HeaderAbort_: |
| md, _ := metadata.FromOutgoingContext(ctx) |
| if v := md[headerAbortHTTPStatus]; v != nil { |
| // HTTP status has priority over gRPC status. |
| if httpStatus, ok := parseIntFromMD(v); ok { |
| code, okCode = grpcFromHTTP(httpStatus) |
| } |
| } else if v := md[headerAbortGRPCStatus]; v != nil { |
| if grpcStatus, ok := parseIntFromMD(v); ok { |
| code, okCode = sanitizeGRPCCode(codes.Code(grpcStatus)), true |
| } |
| } |
| if v := md[headerAbortPercentage]; v != nil { |
| if num, ok := parseIntFromMD(v); ok && num < numerator { |
| numerator = num |
| } |
| } |
| } |
| if !okCode || randIntn(denominator) >= numerator { |
| return nil |
| } |
| if code == codes.OK { |
| return errOKStream |
| } |
| return status.Errorf(code, "RPC terminated due to fault injection") |
| } |
| |
| var errOKStream = errors.New("stream terminated early with OK status") |
| |
| // parseIntFromMD returns the integer in the last header or nil if parsing |
| // failed. |
| func parseIntFromMD(header []string) (int, bool) { |
| if len(header) == 0 { |
| return 0, false |
| } |
| v, err := strconv.Atoi(header[len(header)-1]) |
| return v, err == nil |
| } |
| |
| func splitPct(fp *tpb.FractionalPercent) (num int, den int) { |
| if fp == nil { |
| return 0, 100 |
| } |
| num = int(fp.GetNumerator()) |
| switch fp.GetDenominator() { |
| case tpb.FractionalPercent_HUNDRED: |
| return num, 100 |
| case tpb.FractionalPercent_TEN_THOUSAND: |
| return num, 10 * 1000 |
| case tpb.FractionalPercent_MILLION: |
| return num, 1000 * 1000 |
| } |
| return num, 100 |
| } |
| |
| func grpcFromHTTP(httpStatus int) (codes.Code, bool) { |
| if httpStatus < 200 || httpStatus >= 600 { |
| // Malformed; ignore this fault type. |
| return codes.OK, false |
| } |
| if c := statusMap[httpStatus]; c != codes.OK { |
| // OK = 0/the default for the map. |
| return c, true |
| } |
| // All undefined HTTP status codes convert to Unknown. HTTP status of 200 |
| // is "success", but gRPC converts to Unknown due to missing grpc status. |
| return codes.Unknown, true |
| } |
| |
| func sanitizeGRPCCode(c codes.Code) codes.Code { |
| if c > codes.Code(16) { |
| return codes.Unknown |
| } |
| return c |
| } |
| |
| type okStream struct { |
| ctx context.Context |
| } |
| |
| func (*okStream) Header() (metadata.MD, error) { return nil, nil } |
| func (*okStream) Trailer() metadata.MD { return nil } |
| func (*okStream) CloseSend() error { return nil } |
| func (o *okStream) Context() context.Context { return o.ctx } |
| func (*okStream) SendMsg(m interface{}) error { return io.EOF } |
| func (*okStream) RecvMsg(m interface{}) error { return io.EOF } |