| // Copyright 2021-2023 Buf Technologies, Inc. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package triple_protocol |
| |
| import ( |
| "bufio" |
| "context" |
| "errors" |
| "fmt" |
| "io" |
| "math" |
| "net/http" |
| "net/textproto" |
| "runtime" |
| "strconv" |
| "strings" |
| "time" |
| "unicode/utf8" |
| ) |
| |
| import ( |
| "dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol/internal/proto/connectext/grpc/status/v1" |
| ) |
| |
| // protocol specification headers |
| const ( |
| grpcHeaderCompression = "Grpc-Encoding" |
| grpcHeaderAcceptCompression = "Grpc-Accept-Encoding" |
| grpcHeaderTimeout = "Grpc-Timeout" |
| grpcHeaderStatus = "Grpc-Status" |
| grpcHeaderMessage = "Grpc-Message" |
| grpcHeaderDetails = "Grpc-Status-Details-Bin" |
| |
| grpcFlagEnvelopeTrailer = 0b10000000 |
| |
| grpcTimeoutMaxHours = math.MaxInt64 / int64(time.Hour) // how many hours fit into a time.Duration? |
| grpcMaxTimeoutChars = 8 // from gRPC protocol |
| |
| grpcContentTypeDefault = "application/grpc" |
| grpcContentTypePrefix = grpcContentTypeDefault + "+" |
| ) |
| |
| var ( |
| grpcTimeoutUnits = []struct { |
| size time.Duration |
| char byte |
| }{ |
| {time.Nanosecond, 'n'}, |
| {time.Microsecond, 'u'}, |
| {time.Millisecond, 'm'}, |
| {time.Second, 'S'}, |
| {time.Minute, 'M'}, |
| {time.Hour, 'H'}, |
| } |
| grpcTimeoutUnitLookup = make(map[byte]time.Duration) |
| grpcAllowedMethods = map[string]struct{}{ |
| http.MethodPost: {}, |
| } |
| errTrailersWithoutGRPCStatus = fmt.Errorf("gRPC protocol error: no %s trailer", grpcHeaderStatus) |
| |
| // defaultGrpcUserAgent follows |
| // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#user-agents: |
| // |
| // While the protocol does not require a user-agent to function it is recommended |
| // that clients provide a structured user-agent string that provides a basic |
| // description of the calling library, version & platform to facilitate issue diagnosis |
| // in heterogeneous environments. The following structure is recommended to library developers: |
| // |
| // User-Agent → "grpc-" Language ?("-" Variant) "/" Version ?( " (" *(AdditionalProperty ";") ")" ) |
| defaultGrpcUserAgent = fmt.Sprintf("grpc-go-triple/%s (%s)", Version, runtime.Version()) |
| ) |
| |
| func init() { |
| for _, pair := range grpcTimeoutUnits { |
| grpcTimeoutUnitLookup[pair.char] = pair.size |
| } |
| } |
| |
| type protocolGRPC struct{} |
| |
| // for server side |
| |
| // NewHandler implements protocol, so it must return an interface. |
| func (g *protocolGRPC) NewHandler(params *protocolHandlerParams) protocolHandler { |
| bare, prefix := grpcContentTypeDefault, grpcContentTypePrefix |
| contentTypes := make(map[string]struct{}) |
| for _, name := range params.Codecs.Names() { |
| contentTypes[canonicalizeContentType(prefix+name)] = struct{}{} |
| } |
| // default codec |
| if params.Codecs.Get(codecNameProto) != nil { |
| contentTypes[bare] = struct{}{} |
| } |
| return &grpcHandler{ |
| protocolHandlerParams: *params, |
| accept: contentTypes, |
| } |
| } |
| |
| // for client side |
| |
| // NewClient implements protocol, so it must return an interface. |
| func (g *protocolGRPC) NewClient(params *protocolClientParams) (protocolClient, error) { |
| peer := newPeerFromURL(params.URL, ProtocolGRPC) |
| return &grpcClient{ |
| protocolClientParams: *params, |
| peer: peer, |
| }, nil |
| } |
| |
| type grpcHandler struct { |
| protocolHandlerParams |
| |
| accept map[string]struct{} |
| } |
| |
| func (g *grpcHandler) Methods() map[string]struct{} { |
| return grpcAllowedMethods |
| } |
| |
| func (g *grpcHandler) ContentTypes() map[string]struct{} { |
| return g.accept |
| } |
| |
| func (*grpcHandler) SetTimeout(request *http.Request) (context.Context, context.CancelFunc, error) { |
| timeout, err := grpcParseTimeout(getHeaderCanonical(request.Header, grpcHeaderTimeout)) |
| // not set or great than the threshold |
| if err != nil && !errors.Is(err, errNoTimeout) { |
| // Errors here indicate that the client sent an invalid timeout header, so |
| // the error text is safe to send back. |
| return nil, nil, NewError(CodeInvalidArgument, err) |
| } else if err != nil { |
| // err wraps errNoTimeout, nothing to do. |
| return request.Context(), nil, nil //nolint:nilerr |
| } |
| ctx, cancel := context.WithTimeout(request.Context(), timeout) |
| return ctx, cancel, nil |
| } |
| |
| func (g *grpcHandler) CanHandlePayload(request *http.Request, contentType string) bool { |
| _, ok := g.accept[contentType] |
| return ok |
| } |
| |
| // NewConn is the key function which is responsible for marshal and unmarshal |
| func (g *grpcHandler) NewConn( |
| responseWriter http.ResponseWriter, |
| request *http.Request, |
| ) (handlerConnCloser, bool) { |
| // We need to parse metadata before entering the interceptor stack; we'll |
| // send the error to the client later on. |
| requestCompression, responseCompression, failed := negotiateCompression( |
| g.CompressionPools, |
| getHeaderCanonical(request.Header, grpcHeaderCompression), |
| getHeaderCanonical(request.Header, grpcHeaderAcceptCompression), |
| ) |
| if failed == nil { |
| failed = checkServerStreamsCanFlush(g.Spec, responseWriter) |
| } |
| |
| // keep ready for headers of response |
| |
| // Write any remaining headers here: |
| // (1) any writes to the stream will implicitly send the headers, so we |
| // should get all of gRPC's required response headers ready. |
| // (2) interceptors should be able to see these headers. |
| // |
| // Since we know that these header keys are already in canonical form, we can |
| // skip the normalization in Header.Set. |
| header := responseWriter.Header() |
| header[headerContentType] = []string{getHeaderCanonical(request.Header, headerContentType)} |
| header[grpcHeaderAcceptCompression] = []string{g.CompressionPools.CommaSeparatedNames()} |
| if responseCompression != compressionIdentity { |
| header[grpcHeaderCompression] = []string{responseCompression} |
| } |
| |
| // content-type -> codecName -> codec |
| codecName := grpcCodecFromContentType(getHeaderCanonical(request.Header, headerContentType)) |
| codec := g.Codecs.Get(codecName) // handler.go guarantees this is not nil |
| protocolName := ProtocolGRPC |
| conn := wrapHandlerConnWithCodedErrors(&grpcHandlerConn{ |
| spec: g.Spec, |
| peer: Peer{ |
| Addr: request.RemoteAddr, |
| Protocol: protocolName, |
| }, |
| bufferPool: g.BufferPool, |
| protobuf: g.Codecs.Protobuf(), // for errors |
| marshaler: grpcMarshaler{ |
| envelopeWriter: envelopeWriter{ |
| writer: responseWriter, |
| compressionPool: g.CompressionPools.Get(responseCompression), |
| codec: codec, |
| compressMinBytes: g.CompressMinBytes, |
| bufferPool: g.BufferPool, |
| sendMaxBytes: g.SendMaxBytes, |
| }, |
| }, |
| responseWriter: responseWriter, |
| responseHeader: make(http.Header), |
| responseTrailer: make(http.Header), |
| request: request, |
| unmarshaler: grpcUnmarshaler{ |
| envelopeReader: envelopeReader{ |
| reader: request.Body, |
| codec: codec, |
| compressionPool: g.CompressionPools.Get(requestCompression), |
| bufferPool: g.BufferPool, |
| readMaxBytes: g.ReadMaxBytes, |
| }, |
| }, |
| }) |
| if failed != nil { |
| // Negotiation failed, so we can't establish a stream. |
| _ = conn.Close(failed) |
| return nil, false |
| } |
| return conn, true |
| } |
| |
| type grpcClient struct { |
| protocolClientParams |
| |
| peer Peer |
| } |
| |
| func (g *grpcClient) Peer() Peer { |
| return g.peer |
| } |
| |
| func (g *grpcClient) WriteRequestHeader(_ StreamType, header http.Header) { |
| // We know these header keys are in canonical form, so we can bypass all the |
| // checks in Header.Set. |
| if getHeaderCanonical(header, headerUserAgent) == "" { |
| header[headerUserAgent] = []string{defaultGrpcUserAgent} |
| } |
| header[headerContentType] = []string{grpcContentTypeFromCodecName(g.Codec.Name())} |
| // gRPC handles compression on a per-message basis, so we don't want to |
| // compress the whole stream. By default, http.Client will ask the server |
| // to gzip the stream if we don't set Accept-Encoding. |
| header["Accept-Encoding"] = []string{compressionIdentity} |
| if g.CompressionName != "" && g.CompressionName != compressionIdentity { |
| header[grpcHeaderCompression] = []string{g.CompressionName} |
| } |
| if acceptCompression := g.CompressionPools.CommaSeparatedNames(); acceptCompression != "" { |
| header[grpcHeaderAcceptCompression] = []string{acceptCompression} |
| } |
| // The gRPC-HTTP2 specification requires this - it flushes out proxies that |
| // don't support HTTP trailers. |
| header["Te"] = []string{"trailers"} |
| } |
| |
| func (g *grpcClient) NewConn( |
| ctx context.Context, |
| spec Spec, |
| header http.Header, |
| ) StreamingClientConn { |
| if deadline, ok := ctx.Deadline(); ok { |
| if encodedDeadline, err := grpcEncodeTimeout(time.Until(deadline)); err == nil { |
| // Tests verify that the error in encodeTimeout is unreachable, so we |
| // don't need to handle the error case. |
| header[grpcHeaderTimeout] = []string{encodedDeadline} |
| } |
| } |
| duplexCall := newDuplexHTTPCall( |
| ctx, |
| g.HTTPClient, |
| g.URL, |
| spec, |
| header, |
| ) |
| conn := &grpcClientConn{ |
| spec: spec, |
| peer: g.Peer(), |
| duplexCall: duplexCall, |
| compressionPools: g.CompressionPools, |
| bufferPool: g.BufferPool, |
| protobuf: g.Protobuf, |
| marshaler: grpcMarshaler{ |
| envelopeWriter: envelopeWriter{ |
| writer: duplexCall, |
| compressionPool: g.CompressionPools.Get(g.CompressionName), |
| codec: g.Codec, |
| compressMinBytes: g.CompressMinBytes, |
| bufferPool: g.BufferPool, |
| sendMaxBytes: g.SendMaxBytes, |
| }, |
| }, |
| unmarshaler: grpcUnmarshaler{ |
| envelopeReader: envelopeReader{ |
| reader: duplexCall, |
| codec: g.Codec, |
| bufferPool: g.BufferPool, |
| readMaxBytes: g.ReadMaxBytes, |
| }, |
| }, |
| responseHeader: make(http.Header), |
| responseTrailer: make(http.Header), |
| } |
| duplexCall.SetValidateResponse(conn.validateResponse) |
| conn.readTrailers = func(_ *grpcUnmarshaler, call *duplexHTTPCall) http.Header { |
| // To access HTTP trailers, we need to read the body to EOF. |
| _ = discard(call) |
| return call.ResponseTrailer() |
| } |
| return wrapClientConnWithCodedErrors(conn) |
| } |
| |
| // grpcClientConn works for both gRPC and gRPC-Web. |
| type grpcClientConn struct { |
| spec Spec |
| peer Peer |
| duplexCall *duplexHTTPCall |
| compressionPools readOnlyCompressionPools |
| bufferPool *bufferPool |
| protobuf Codec // for errors |
| marshaler grpcMarshaler |
| unmarshaler grpcUnmarshaler |
| responseHeader http.Header |
| responseTrailer http.Header |
| readTrailers func(*grpcUnmarshaler, *duplexHTTPCall) http.Header |
| } |
| |
| func (cc *grpcClientConn) Spec() Spec { |
| return cc.spec |
| } |
| |
| func (cc *grpcClientConn) Peer() Peer { |
| return cc.peer |
| } |
| |
| func (cc *grpcClientConn) Send(msg interface{}) error { |
| if err := cc.marshaler.Marshal(msg); err != nil { |
| return err |
| } |
| return nil // must be a literal nil: nil *Error is a non-nil error |
| } |
| |
| func (cc *grpcClientConn) RequestHeader() http.Header { |
| return cc.duplexCall.Header() |
| } |
| |
| func (cc *grpcClientConn) CloseRequest() error { |
| return cc.duplexCall.CloseWrite() |
| } |
| |
| func (cc *grpcClientConn) Receive(msg interface{}) error { |
| cc.duplexCall.BlockUntilResponseReady() |
| err := cc.unmarshaler.Unmarshal(msg) |
| if err == nil { |
| return nil |
| } |
| if getHeaderCanonical(cc.responseHeader, grpcHeaderStatus) != "" { |
| // We got what gRPC calls a trailers-only response, which puts the trailing |
| // metadata (including errors) into HTTP headers. validateResponse has |
| // already extracted the error. |
| return err |
| } |
| // See if the server sent an explicit error in the HTTP or gRPC-Web trailers. |
| mergeHeaders( |
| cc.responseTrailer, |
| cc.readTrailers(&cc.unmarshaler, cc.duplexCall), |
| ) |
| serverErr := grpcErrorFromTrailer(cc.bufferPool, cc.protobuf, cc.responseTrailer) |
| if serverErr != nil && (errors.Is(err, io.EOF) || !errors.Is(serverErr, errTrailersWithoutGRPCStatus)) { |
| // We've either: |
| // - Cleanly read until the end of the response body and *not* received |
| // gRPC status trailers, which is a protocol error, or |
| // - Received an explicit error from the server. |
| // |
| // This is expected from a protocol perspective, but receiving trailers |
| // means that we're _not_ getting a message. For users to realize that |
| // the stream has ended, Receive must return an error. |
| serverErr.meta = cc.responseHeader.Clone() |
| mergeHeaders(serverErr.meta, cc.responseTrailer) |
| cc.duplexCall.SetError(serverErr) |
| return serverErr |
| } |
| // This was probably an error converting the bytes to a message or an error |
| // reading from the network. We're going to return it to the |
| // user, but we also want to setResponseError so Send errors out. |
| cc.duplexCall.SetError(err) |
| return err |
| } |
| |
| func (cc *grpcClientConn) ResponseHeader() http.Header { |
| cc.duplexCall.BlockUntilResponseReady() |
| return cc.responseHeader |
| } |
| |
| func (cc *grpcClientConn) ResponseTrailer() http.Header { |
| cc.duplexCall.BlockUntilResponseReady() |
| return cc.responseTrailer |
| } |
| |
| func (cc *grpcClientConn) CloseResponse() error { |
| return cc.duplexCall.CloseRead() |
| } |
| |
| func (cc *grpcClientConn) validateResponse(response *http.Response) *Error { |
| if err := grpcValidateResponse( |
| response, |
| cc.responseHeader, |
| cc.responseTrailer, |
| cc.compressionPools, |
| cc.bufferPool, |
| cc.protobuf, |
| ); err != nil { |
| return err |
| } |
| compression := getHeaderCanonical(response.Header, grpcHeaderCompression) |
| cc.unmarshaler.envelopeReader.compressionPool = cc.compressionPools.Get(compression) |
| return nil |
| } |
| |
| // connection based on compression, codec |
| type grpcHandlerConn struct { |
| spec Spec |
| peer Peer |
| web bool |
| bufferPool *bufferPool |
| protobuf Codec // for errors |
| marshaler grpcMarshaler |
| responseWriter http.ResponseWriter |
| responseHeader http.Header |
| responseTrailer http.Header |
| wroteToBody bool |
| request *http.Request |
| unmarshaler grpcUnmarshaler |
| } |
| |
| func (hc *grpcHandlerConn) Spec() Spec { |
| return hc.spec |
| } |
| |
| func (hc *grpcHandlerConn) Peer() Peer { |
| return hc.peer |
| } |
| |
| // Receive delegated receive and unmarshal processes to unmarshaler |
| func (hc *grpcHandlerConn) Receive(msg interface{}) error { |
| if err := hc.unmarshaler.Unmarshal(msg); err != nil { |
| return err // already coded |
| } |
| return nil // must be a literal nil: nil *Error is a non-nil error |
| } |
| |
| func (hc *grpcHandlerConn) RequestHeader() http.Header { |
| return hc.request.Header |
| } |
| |
| func (hc *grpcHandlerConn) Send(msg interface{}) error { |
| defer flushResponseWriter(hc.responseWriter) |
| if !hc.wroteToBody { |
| mergeHeaders(hc.responseWriter.Header(), hc.responseHeader) |
| hc.wroteToBody = true |
| } |
| if err := hc.marshaler.Marshal(msg); err != nil { |
| return err |
| } |
| return nil // must be a literal nil: nil *Error is a non-nil error |
| } |
| |
| func (hc *grpcHandlerConn) ResponseHeader() http.Header { |
| return hc.responseHeader |
| } |
| |
| func (hc *grpcHandlerConn) ResponseTrailer() http.Header { |
| return hc.responseTrailer |
| } |
| |
| func (hc *grpcHandlerConn) Close(err error) (retErr error) { |
| defer func() { |
| // We don't want to copy unread portions of the body to /dev/null here: if |
| // the client hasn't closed the request body, we'll block until the server |
| // timeout kicks in. This could happen because the client is malicious, but |
| // a well-intentioned client may just not expect the server to be returning |
| // an error for a streaming RPC. Better to accept that we can't always reuse |
| // TCP connections. |
| closeErr := hc.request.Body.Close() |
| if retErr == nil { |
| retErr = closeErr |
| } |
| }() |
| defer flushResponseWriter(hc.responseWriter) |
| // If we haven't written the headers yet, do so. |
| if !hc.wroteToBody { |
| mergeHeaders(hc.responseWriter.Header(), hc.responseHeader) |
| } |
| // gRPC always sends the error's code, message, details, and metadata as |
| // trailing metadata. The Triple protocol doesn't do this, so we don't want |
| // to mutate the trailers map that the user sees. |
| mergedTrailers := make( |
| http.Header, |
| len(hc.responseTrailer)+2, // always make space for status & message |
| ) |
| mergeHeaders(mergedTrailers, hc.responseTrailer) |
| grpcErrorToTrailer(hc.bufferPool, mergedTrailers, hc.protobuf, err) |
| if hc.web && !hc.wroteToBody { |
| // We're using gRPC-Web and we haven't yet written to the body. Since we're |
| // not sending any response messages, the gRPC specification calls this a |
| // "trailers-only" response. Under those circumstances, the gRPC-Web spec |
| // says that implementations _may_ send trailing metadata as HTTP headers |
| // instead. Envoy is the canonical implementation of the gRPC-Web protocol, |
| // so we emulate Envoy's behavior and put the trailing metadata in the HTTP |
| // headers. |
| mergeHeaders(hc.responseWriter.Header(), mergedTrailers) |
| return nil |
| } |
| if hc.web { |
| // We're using gRPC-Web and we've already sent the headers, so we write |
| // trailing metadata to the HTTP body. |
| if err := hc.marshaler.MarshalWebTrailers(mergedTrailers); err != nil { |
| return err |
| } |
| return nil // must be a literal nil: nil *Error is a non-nil error |
| } |
| // We're using standard gRPC. Even if we haven't written to the body and |
| // we're sending a "trailers-only" response, we must send trailing metadata |
| // as HTTP trailers. (If we had frame-level control of the HTTP/2 layer, we |
| // could send trailers-only responses as a single HEADER frame and no DATA |
| // frames, but net/http doesn't expose APIs that low-level.) |
| if !hc.wroteToBody { |
| // This block works around a bug in x/net/http2. Until Go 1.20, trailers |
| // written using http.TrailerPrefix were only sent if either (1) there's |
| // data in the body, or (2) the innermost http.ResponseWriter is flushed. |
| // To ensure that we always send a valid gRPC response, even if the user |
| // has wrapped the response writer in net/http middleware that doesn't |
| // implement http.Flusher, we must pre-declare our HTTP trailers. We can |
| // remove this when Go 1.21 ships and we drop support for Go 1.19. |
| for key := range mergedTrailers { |
| addHeaderCanonical(hc.responseWriter.Header(), headerTrailer, key) |
| } |
| hc.responseWriter.WriteHeader(http.StatusOK) |
| for key, values := range mergedTrailers { |
| for _, value := range values { |
| // These are potentially user-supplied, so we can't assume they're in |
| // canonical form. Don't use addHeaderCanonical. |
| hc.responseWriter.Header().Add(key, value) |
| } |
| } |
| return nil |
| } |
| // In net/http's ResponseWriter API, we send HTTP trailers by writing to the |
| // headers map with a special prefix. This prefixing is an implementation |
| // detail, so we should hide it and _not_ mutate the user-visible headers. |
| // |
| // Note that this is _very_ finicky and difficult to test with net/http, |
| // since correctness depends on low-level framing details. Breaking this |
| // logic breaks Envoy's gRPC-Web translation. |
| for key, values := range mergedTrailers { |
| for _, value := range values { |
| // These are potentially user-supplied, so we can't assume they're in |
| // canonical form. Don't use addHeaderCanonical. |
| hc.responseWriter.Header().Add(http.TrailerPrefix+key, value) |
| } |
| } |
| return nil |
| } |
| |
| type grpcMarshaler struct { |
| envelopeWriter |
| } |
| |
| func (m *grpcMarshaler) MarshalWebTrailers(trailer http.Header) *Error { |
| raw := m.envelopeWriter.bufferPool.Get() |
| defer m.envelopeWriter.bufferPool.Put(raw) |
| for key, values := range trailer { |
| // Per the Go specification, keys inserted during iteration may be produced |
| // later in the iteration or may be skipped. For safety, avoid mutating the |
| // map if the key is already lower-cased. |
| lower := strings.ToLower(key) |
| if key == lower { |
| continue |
| } |
| delete(trailer, key) |
| trailer[lower] = values |
| } |
| if err := trailer.Write(raw); err != nil { |
| return errorf(CodeInternal, "format trailers: %w", err) |
| } |
| return m.Write(&envelope{ |
| Data: raw, |
| Flags: grpcFlagEnvelopeTrailer, |
| }) |
| } |
| |
| type grpcUnmarshaler struct { |
| envelopeReader envelopeReader |
| webTrailer http.Header |
| } |
| |
| func (u *grpcUnmarshaler) Unmarshal(message interface{}) *Error { |
| // delegate read packet and unmarshal processes to envelopeReader |
| err := u.envelopeReader.Unmarshal(message) |
| if err == nil { |
| return nil |
| } |
| if !errors.Is(err, errSpecialEnvelope) { |
| return err |
| } |
| // for special envelope |
| env := u.envelopeReader.last |
| // for non-web grpc, last envelope needs to set grpcFlagEnvelopeTrailer |
| if !env.IsSet(grpcFlagEnvelopeTrailer) { |
| return errorf(CodeInternal, "protocol error: invalid envelope flags %d", env.Flags) |
| } |
| |
| // Per the gRPC-Web specification, trailers should be encoded as an HTTP/1 |
| // headers block _without_ the terminating newline. To make the headers |
| // parseable by net/textproto, we need to add the newline. |
| if err := env.Data.WriteByte('\n'); err != nil { |
| return errorf(CodeInternal, "unmarshal web trailers: %w", err) |
| } |
| bufferedReader := bufio.NewReader(env.Data) |
| mimeReader := textproto.NewReader(bufferedReader) |
| mimeHeader, mimeErr := mimeReader.ReadMIMEHeader() |
| if mimeErr != nil { |
| return errorf( |
| CodeInternal, |
| "gRPC-Web protocol error: trailers invalid: %w", |
| mimeErr, |
| ) |
| } |
| u.webTrailer = http.Header(mimeHeader) |
| return errSpecialEnvelope |
| } |
| |
| func (u *grpcUnmarshaler) WebTrailer() http.Header { |
| return u.webTrailer |
| } |
| |
| func grpcValidateResponse( |
| response *http.Response, |
| header, trailer http.Header, |
| availableCompressors readOnlyCompressionPools, |
| bufferPool *bufferPool, |
| protobuf Codec, |
| ) *Error { |
| if response.StatusCode != http.StatusOK { |
| return errorf(grpcHTTPToCode(response.StatusCode), "HTTP status %v", response.Status) |
| } |
| if compression := getHeaderCanonical(response.Header, grpcHeaderCompression); compression != "" && |
| compression != compressionIdentity && |
| !availableCompressors.Contains(compression) { |
| // Per https://github.com/grpc/grpc/blob/master/doc/compression.md, we |
| // should return CodeInternal and specify acceptable compression(s) (in |
| // addition to setting the Grpc-Accept-Encoding header). |
| return errorf( |
| CodeInternal, |
| "unknown encoding %q: accepted encodings are %v", |
| compression, |
| availableCompressors.CommaSeparatedNames(), |
| ) |
| } |
| // When there's no body, gRPC and gRPC-Web servers may send error information |
| // in the HTTP headers. |
| if err := grpcErrorFromTrailer( |
| bufferPool, |
| protobuf, |
| response.Header, |
| ); err != nil && !errors.Is(err, errTrailersWithoutGRPCStatus) { |
| // Per the specification, only the HTTP status code and Content-Type should |
| // be treated as headers. The rest should be treated as trailing metadata. |
| if contentType := getHeaderCanonical(response.Header, headerContentType); contentType != "" { |
| setHeaderCanonical(header, headerContentType, contentType) |
| } |
| mergeHeaders(trailer, response.Header) |
| delHeaderCanonical(trailer, headerContentType) |
| // Also set the error metadata |
| err.meta = header.Clone() |
| mergeHeaders(err.meta, trailer) |
| return err |
| } |
| // The response is valid, so we should expose the headers. |
| mergeHeaders(header, response.Header) |
| return nil |
| } |
| |
| func grpcHTTPToCode(httpCode int) Code { |
| // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md |
| // Note that this is not just the inverse of the gRPC-to-HTTP mapping. |
| switch httpCode { |
| case 400: |
| return CodeInternal |
| case 401: |
| return CodeUnauthenticated |
| case 403: |
| return CodePermissionDenied |
| case 404: |
| return CodeUnimplemented |
| case 429: |
| return CodeUnavailable |
| case 502, 503, 504: |
| return CodeUnavailable |
| default: |
| return CodeUnknown |
| } |
| } |
| |
| // The gRPC wire protocol specifies that errors should be serialized using the |
| // binary Protobuf format, even if the messages in the request/response stream |
| // use a different codec. Consequently, this function needs a Protobuf codec to |
| // unmarshal error information in the headers. |
| func grpcErrorFromTrailer(bufferPool *bufferPool, protobuf Codec, trailer http.Header) *Error { |
| codeHeader := getHeaderCanonical(trailer, grpcHeaderStatus) |
| if codeHeader == "" { |
| return NewError(CodeInternal, errTrailersWithoutGRPCStatus) |
| } |
| if codeHeader == "0" { |
| return nil |
| } |
| |
| code, err := strconv.ParseUint(codeHeader, 10 /* base */, 32 /* bitsize */) |
| if err != nil { |
| return errorf(CodeInternal, "gRPC protocol error: invalid error code %q", codeHeader) |
| } |
| message := grpcPercentDecode(bufferPool, getHeaderCanonical(trailer, grpcHeaderMessage)) |
| retErr := NewWireError(Code(code), errors.New(message)) |
| |
| detailsBinaryEncoded := getHeaderCanonical(trailer, grpcHeaderDetails) |
| if len(detailsBinaryEncoded) > 0 { |
| detailsBinary, err := DecodeBinaryHeader(detailsBinaryEncoded) |
| if err != nil { |
| return errorf(CodeInternal, "server returned invalid grpc-status-details-bin trailer: %w", err) |
| } |
| var status statusv1.Status |
| if err := protobuf.Unmarshal(detailsBinary, &status); err != nil { |
| return errorf(CodeInternal, "server returned invalid protobuf for error details: %w", err) |
| } |
| for _, d := range status.Details { |
| retErr.details = append(retErr.details, &ErrorDetail{pb: d}) |
| } |
| // Prefer the Protobuf-encoded data to the headers (grpc-go does this too). |
| retErr.code = Code(status.Code) |
| retErr.err = errors.New(status.Message) |
| } |
| |
| return retErr |
| } |
| |
| func grpcParseTimeout(timeout string) (time.Duration, error) { |
| if timeout == "" { |
| return 0, errNoTimeout |
| } |
| unit, ok := grpcTimeoutUnitLookup[timeout[len(timeout)-1]] |
| if !ok { |
| return 0, fmt.Errorf("gRPC protocol error: timeout %q has invalid unit", timeout) |
| } |
| num, err := strconv.ParseInt(timeout[:len(timeout)-1], 10 /* base */, 64 /* bitsize */) |
| if err != nil || num < 0 { |
| return 0, fmt.Errorf("gRPC protocol error: invalid timeout %q", timeout) |
| } |
| if num > 99999999 { // timeout must be ASCII string of at most 8 digits |
| return 0, fmt.Errorf("gRPC protocol error: timeout %q is too long", timeout) |
| } |
| if unit == time.Hour && num > grpcTimeoutMaxHours { |
| // Timeout is effectively unbounded, so ignore it. The grpc-go |
| // implementation does the same thing. |
| return 0, errNoTimeout |
| } |
| return time.Duration(num) * unit, nil |
| } |
| |
| func grpcEncodeTimeout(timeout time.Duration) (string, error) { |
| if timeout <= 0 { |
| return "0n", nil |
| } |
| for _, pair := range grpcTimeoutUnits { |
| digits := strconv.FormatInt(int64(timeout/pair.size), 10 /* base */) |
| if len(digits) < grpcMaxTimeoutChars { |
| return digits + string(pair.char), nil |
| } |
| } |
| // The max time.Duration is smaller than the maximum expressible gRPC |
| // timeout, so we can't reach this case. |
| return "", errNoTimeout |
| } |
| |
| func grpcCodecFromContentType(contentType string) string { |
| if contentType == grpcContentTypeDefault { |
| // implicitly protobuf |
| return codecNameProto |
| } |
| |
| return strings.TrimPrefix(contentType, grpcContentTypePrefix) |
| } |
| |
| func grpcContentTypeFromCodecName(name string) string { |
| return grpcContentTypePrefix + name |
| } |
| |
| func grpcErrorToTrailer(bufferPool *bufferPool, trailer http.Header, protobuf Codec, err error) { |
| if err == nil { |
| setHeaderCanonical(trailer, grpcHeaderStatus, "0") // zero is the gRPC OK status |
| setHeaderCanonical(trailer, grpcHeaderMessage, "") |
| return |
| } |
| status := grpcStatusFromError(err) |
| code := strconv.Itoa(int(status.Code)) |
| bin, binErr := protobuf.Marshal(status) |
| if binErr != nil { |
| setHeaderCanonical( |
| trailer, |
| grpcHeaderStatus, |
| strconv.FormatInt(int64(CodeInternal), 10 /* base */), |
| ) |
| setHeaderCanonical( |
| trailer, |
| grpcHeaderMessage, |
| grpcPercentEncode( |
| bufferPool, |
| fmt.Sprintf("marshal protobuf status: %v", binErr), |
| ), |
| ) |
| return |
| } |
| if tripleErr, ok := asError(err); ok { |
| mergeHeaders(trailer, tripleErr.meta) |
| } |
| setHeaderCanonical(trailer, grpcHeaderStatus, code) |
| setHeaderCanonical(trailer, grpcHeaderMessage, grpcPercentEncode(bufferPool, status.Message)) |
| setHeaderCanonical(trailer, grpcHeaderDetails, EncodeBinaryHeader(bin)) |
| } |
| |
| func grpcStatusFromError(err error) *statusv1.Status { |
| status := &statusv1.Status{ |
| Code: int32(CodeUnknown), |
| Message: err.Error(), |
| } |
| if tripleErr, ok := asError(err); ok { |
| status.Code = int32(tripleErr.Code()) |
| status.Message = tripleErr.Message() |
| status.Details = tripleErr.detailsAsAny() |
| } |
| return status |
| } |
| |
| // grpcPercentEncode follows RFC 3986 Section 2.1 and the gRPC HTTP/2 spec. |
| // It's a variant of URL-encoding with fewer reserved characters. It's intended |
| // to take UTF-8 encoded text and escape non-ASCII bytes so that they're valid |
| // HTTP/1 headers, while still maximizing readability of the data on the wire. |
| // |
| // The grpc-message trailer (used for human-readable error messages) should be |
| // percent-encoded. |
| // |
| // References: |
| // |
| // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses |
| // https://datatracker.ietf.org/doc/html/rfc3986#section-2.1 |
| func grpcPercentEncode(bufferPool *bufferPool, msg string) string { |
| for i := 0; i < len(msg); i++ { |
| // Characters that need to be escaped are defined in gRPC's HTTP/2 spec. |
| // They're different from the generic set defined in RFC 3986. |
| if c := msg[i]; c < ' ' || c > '~' || c == '%' { |
| return grpcPercentEncodeSlow(bufferPool, msg, i) |
| } |
| } |
| return msg |
| } |
| |
| // msg needs some percent-escaping. Bytes before offset don't require |
| // percent-encoding, so they can be copied to the output as-is. |
| func grpcPercentEncodeSlow(bufferPool *bufferPool, msg string, offset int) string { |
| out := bufferPool.Get() |
| defer bufferPool.Put(out) |
| out.WriteString(msg[:offset]) |
| for i := offset; i < len(msg); i++ { |
| c := msg[i] |
| if c < ' ' || c > '~' || c == '%' { |
| out.WriteString(fmt.Sprintf("%%%02X", c)) |
| continue |
| } |
| out.WriteByte(c) |
| } |
| return out.String() |
| } |
| |
| func grpcPercentDecode(bufferPool *bufferPool, encoded string) string { |
| for i := 0; i < len(encoded); i++ { |
| if c := encoded[i]; c == '%' && i+2 < len(encoded) { |
| return grpcPercentDecodeSlow(bufferPool, encoded, i) |
| } |
| } |
| return encoded |
| } |
| |
| // Similar to percentEncodeSlow: encoded is percent-encoded, and needs to be |
| // decoded byte-by-byte starting at offset. |
| func grpcPercentDecodeSlow(bufferPool *bufferPool, encoded string, offset int) string { |
| out := bufferPool.Get() |
| defer bufferPool.Put(out) |
| out.WriteString(encoded[:offset]) |
| for i := offset; i < len(encoded); i++ { |
| c := encoded[i] |
| if c != '%' || i+2 >= len(encoded) { |
| out.WriteByte(c) |
| continue |
| } |
| parsed, err := strconv.ParseUint(encoded[i+1:i+3], 16 /* hex */, 8 /* bitsize */) |
| if err != nil { |
| out.WriteRune(utf8.RuneError) |
| } else { |
| out.WriteByte(byte(parsed)) |
| } |
| i += 2 |
| } |
| return out.String() |
| } |