blob: 80dff3864ed50a32e24a69d51f0f03674da65bbd [file] [log] [blame]
// Copyright 2021-2023 Buf Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package triple_protocol
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"math"
"net/http"
"net/textproto"
"runtime"
"strconv"
"strings"
"time"
"unicode/utf8"
)
import (
"dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol/internal/proto/connectext/grpc/status/v1"
)
// protocol specification headers
const (
grpcHeaderCompression = "Grpc-Encoding"
grpcHeaderAcceptCompression = "Grpc-Accept-Encoding"
grpcHeaderTimeout = "Grpc-Timeout"
grpcHeaderStatus = "Grpc-Status"
grpcHeaderMessage = "Grpc-Message"
grpcHeaderDetails = "Grpc-Status-Details-Bin"
grpcFlagEnvelopeTrailer = 0b10000000
grpcTimeoutMaxHours = math.MaxInt64 / int64(time.Hour) // how many hours fit into a time.Duration?
grpcMaxTimeoutChars = 8 // from gRPC protocol
grpcContentTypeDefault = "application/grpc"
grpcContentTypePrefix = grpcContentTypeDefault + "+"
)
var (
grpcTimeoutUnits = []struct {
size time.Duration
char byte
}{
{time.Nanosecond, 'n'},
{time.Microsecond, 'u'},
{time.Millisecond, 'm'},
{time.Second, 'S'},
{time.Minute, 'M'},
{time.Hour, 'H'},
}
grpcTimeoutUnitLookup = make(map[byte]time.Duration)
grpcAllowedMethods = map[string]struct{}{
http.MethodPost: {},
}
errTrailersWithoutGRPCStatus = fmt.Errorf("gRPC protocol error: no %s trailer", grpcHeaderStatus)
// defaultGrpcUserAgent follows
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#user-agents:
//
// While the protocol does not require a user-agent to function it is recommended
// that clients provide a structured user-agent string that provides a basic
// description of the calling library, version & platform to facilitate issue diagnosis
// in heterogeneous environments. The following structure is recommended to library developers:
//
// User-Agent → "grpc-" Language ?("-" Variant) "/" Version ?( " (" *(AdditionalProperty ";") ")" )
defaultGrpcUserAgent = fmt.Sprintf("grpc-go-triple/%s (%s)", Version, runtime.Version())
)
func init() {
for _, pair := range grpcTimeoutUnits {
grpcTimeoutUnitLookup[pair.char] = pair.size
}
}
type protocolGRPC struct{}
// for server side
// NewHandler implements protocol, so it must return an interface.
func (g *protocolGRPC) NewHandler(params *protocolHandlerParams) protocolHandler {
bare, prefix := grpcContentTypeDefault, grpcContentTypePrefix
contentTypes := make(map[string]struct{})
for _, name := range params.Codecs.Names() {
contentTypes[canonicalizeContentType(prefix+name)] = struct{}{}
}
// default codec
if params.Codecs.Get(codecNameProto) != nil {
contentTypes[bare] = struct{}{}
}
return &grpcHandler{
protocolHandlerParams: *params,
accept: contentTypes,
}
}
// for client side
// NewClient implements protocol, so it must return an interface.
func (g *protocolGRPC) NewClient(params *protocolClientParams) (protocolClient, error) {
peer := newPeerFromURL(params.URL, ProtocolGRPC)
return &grpcClient{
protocolClientParams: *params,
peer: peer,
}, nil
}
type grpcHandler struct {
protocolHandlerParams
accept map[string]struct{}
}
func (g *grpcHandler) Methods() map[string]struct{} {
return grpcAllowedMethods
}
func (g *grpcHandler) ContentTypes() map[string]struct{} {
return g.accept
}
func (*grpcHandler) SetTimeout(request *http.Request) (context.Context, context.CancelFunc, error) {
timeout, err := grpcParseTimeout(getHeaderCanonical(request.Header, grpcHeaderTimeout))
// not set or great than the threshold
if err != nil && !errors.Is(err, errNoTimeout) {
// Errors here indicate that the client sent an invalid timeout header, so
// the error text is safe to send back.
return nil, nil, NewError(CodeInvalidArgument, err)
} else if err != nil {
// err wraps errNoTimeout, nothing to do.
return request.Context(), nil, nil //nolint:nilerr
}
ctx, cancel := context.WithTimeout(request.Context(), timeout)
return ctx, cancel, nil
}
func (g *grpcHandler) CanHandlePayload(request *http.Request, contentType string) bool {
_, ok := g.accept[contentType]
return ok
}
// NewConn is the key function which is responsible for marshal and unmarshal
func (g *grpcHandler) NewConn(
responseWriter http.ResponseWriter,
request *http.Request,
) (handlerConnCloser, bool) {
// We need to parse metadata before entering the interceptor stack; we'll
// send the error to the client later on.
requestCompression, responseCompression, failed := negotiateCompression(
g.CompressionPools,
getHeaderCanonical(request.Header, grpcHeaderCompression),
getHeaderCanonical(request.Header, grpcHeaderAcceptCompression),
)
if failed == nil {
failed = checkServerStreamsCanFlush(g.Spec, responseWriter)
}
// keep ready for headers of response
// Write any remaining headers here:
// (1) any writes to the stream will implicitly send the headers, so we
// should get all of gRPC's required response headers ready.
// (2) interceptors should be able to see these headers.
//
// Since we know that these header keys are already in canonical form, we can
// skip the normalization in Header.Set.
header := responseWriter.Header()
header[headerContentType] = []string{getHeaderCanonical(request.Header, headerContentType)}
header[grpcHeaderAcceptCompression] = []string{g.CompressionPools.CommaSeparatedNames()}
if responseCompression != compressionIdentity {
header[grpcHeaderCompression] = []string{responseCompression}
}
// content-type -> codecName -> codec
codecName := grpcCodecFromContentType(getHeaderCanonical(request.Header, headerContentType))
codec := g.Codecs.Get(codecName) // handler.go guarantees this is not nil
protocolName := ProtocolGRPC
conn := wrapHandlerConnWithCodedErrors(&grpcHandlerConn{
spec: g.Spec,
peer: Peer{
Addr: request.RemoteAddr,
Protocol: protocolName,
},
bufferPool: g.BufferPool,
protobuf: g.Codecs.Protobuf(), // for errors
marshaler: grpcMarshaler{
envelopeWriter: envelopeWriter{
writer: responseWriter,
compressionPool: g.CompressionPools.Get(responseCompression),
codec: codec,
compressMinBytes: g.CompressMinBytes,
bufferPool: g.BufferPool,
sendMaxBytes: g.SendMaxBytes,
},
},
responseWriter: responseWriter,
responseHeader: make(http.Header),
responseTrailer: make(http.Header),
request: request,
unmarshaler: grpcUnmarshaler{
envelopeReader: envelopeReader{
reader: request.Body,
codec: codec,
compressionPool: g.CompressionPools.Get(requestCompression),
bufferPool: g.BufferPool,
readMaxBytes: g.ReadMaxBytes,
},
},
})
if failed != nil {
// Negotiation failed, so we can't establish a stream.
_ = conn.Close(failed)
return nil, false
}
return conn, true
}
type grpcClient struct {
protocolClientParams
peer Peer
}
func (g *grpcClient) Peer() Peer {
return g.peer
}
func (g *grpcClient) WriteRequestHeader(_ StreamType, header http.Header) {
// We know these header keys are in canonical form, so we can bypass all the
// checks in Header.Set.
if getHeaderCanonical(header, headerUserAgent) == "" {
header[headerUserAgent] = []string{defaultGrpcUserAgent}
}
header[headerContentType] = []string{grpcContentTypeFromCodecName(g.Codec.Name())}
// gRPC handles compression on a per-message basis, so we don't want to
// compress the whole stream. By default, http.Client will ask the server
// to gzip the stream if we don't set Accept-Encoding.
header["Accept-Encoding"] = []string{compressionIdentity}
if g.CompressionName != "" && g.CompressionName != compressionIdentity {
header[grpcHeaderCompression] = []string{g.CompressionName}
}
if acceptCompression := g.CompressionPools.CommaSeparatedNames(); acceptCompression != "" {
header[grpcHeaderAcceptCompression] = []string{acceptCompression}
}
// The gRPC-HTTP2 specification requires this - it flushes out proxies that
// don't support HTTP trailers.
header["Te"] = []string{"trailers"}
}
func (g *grpcClient) NewConn(
ctx context.Context,
spec Spec,
header http.Header,
) StreamingClientConn {
if deadline, ok := ctx.Deadline(); ok {
if encodedDeadline, err := grpcEncodeTimeout(time.Until(deadline)); err == nil {
// Tests verify that the error in encodeTimeout is unreachable, so we
// don't need to handle the error case.
header[grpcHeaderTimeout] = []string{encodedDeadline}
}
}
duplexCall := newDuplexHTTPCall(
ctx,
g.HTTPClient,
g.URL,
spec,
header,
)
conn := &grpcClientConn{
spec: spec,
peer: g.Peer(),
duplexCall: duplexCall,
compressionPools: g.CompressionPools,
bufferPool: g.BufferPool,
protobuf: g.Protobuf,
marshaler: grpcMarshaler{
envelopeWriter: envelopeWriter{
writer: duplexCall,
compressionPool: g.CompressionPools.Get(g.CompressionName),
codec: g.Codec,
compressMinBytes: g.CompressMinBytes,
bufferPool: g.BufferPool,
sendMaxBytes: g.SendMaxBytes,
},
},
unmarshaler: grpcUnmarshaler{
envelopeReader: envelopeReader{
reader: duplexCall,
codec: g.Codec,
bufferPool: g.BufferPool,
readMaxBytes: g.ReadMaxBytes,
},
},
responseHeader: make(http.Header),
responseTrailer: make(http.Header),
}
duplexCall.SetValidateResponse(conn.validateResponse)
conn.readTrailers = func(_ *grpcUnmarshaler, call *duplexHTTPCall) http.Header {
// To access HTTP trailers, we need to read the body to EOF.
_ = discard(call)
return call.ResponseTrailer()
}
return wrapClientConnWithCodedErrors(conn)
}
// grpcClientConn works for both gRPC and gRPC-Web.
type grpcClientConn struct {
spec Spec
peer Peer
duplexCall *duplexHTTPCall
compressionPools readOnlyCompressionPools
bufferPool *bufferPool
protobuf Codec // for errors
marshaler grpcMarshaler
unmarshaler grpcUnmarshaler
responseHeader http.Header
responseTrailer http.Header
readTrailers func(*grpcUnmarshaler, *duplexHTTPCall) http.Header
}
func (cc *grpcClientConn) Spec() Spec {
return cc.spec
}
func (cc *grpcClientConn) Peer() Peer {
return cc.peer
}
func (cc *grpcClientConn) Send(msg interface{}) error {
if err := cc.marshaler.Marshal(msg); err != nil {
return err
}
return nil // must be a literal nil: nil *Error is a non-nil error
}
func (cc *grpcClientConn) RequestHeader() http.Header {
return cc.duplexCall.Header()
}
func (cc *grpcClientConn) CloseRequest() error {
return cc.duplexCall.CloseWrite()
}
func (cc *grpcClientConn) Receive(msg interface{}) error {
cc.duplexCall.BlockUntilResponseReady()
err := cc.unmarshaler.Unmarshal(msg)
if err == nil {
return nil
}
if getHeaderCanonical(cc.responseHeader, grpcHeaderStatus) != "" {
// We got what gRPC calls a trailers-only response, which puts the trailing
// metadata (including errors) into HTTP headers. validateResponse has
// already extracted the error.
return err
}
// See if the server sent an explicit error in the HTTP or gRPC-Web trailers.
mergeHeaders(
cc.responseTrailer,
cc.readTrailers(&cc.unmarshaler, cc.duplexCall),
)
serverErr := grpcErrorFromTrailer(cc.bufferPool, cc.protobuf, cc.responseTrailer)
if serverErr != nil && (errors.Is(err, io.EOF) || !errors.Is(serverErr, errTrailersWithoutGRPCStatus)) {
// We've either:
// - Cleanly read until the end of the response body and *not* received
// gRPC status trailers, which is a protocol error, or
// - Received an explicit error from the server.
//
// This is expected from a protocol perspective, but receiving trailers
// means that we're _not_ getting a message. For users to realize that
// the stream has ended, Receive must return an error.
serverErr.meta = cc.responseHeader.Clone()
mergeHeaders(serverErr.meta, cc.responseTrailer)
cc.duplexCall.SetError(serverErr)
return serverErr
}
// This was probably an error converting the bytes to a message or an error
// reading from the network. We're going to return it to the
// user, but we also want to setResponseError so Send errors out.
cc.duplexCall.SetError(err)
return err
}
func (cc *grpcClientConn) ResponseHeader() http.Header {
cc.duplexCall.BlockUntilResponseReady()
return cc.responseHeader
}
func (cc *grpcClientConn) ResponseTrailer() http.Header {
cc.duplexCall.BlockUntilResponseReady()
return cc.responseTrailer
}
func (cc *grpcClientConn) CloseResponse() error {
return cc.duplexCall.CloseRead()
}
func (cc *grpcClientConn) validateResponse(response *http.Response) *Error {
if err := grpcValidateResponse(
response,
cc.responseHeader,
cc.responseTrailer,
cc.compressionPools,
cc.bufferPool,
cc.protobuf,
); err != nil {
return err
}
compression := getHeaderCanonical(response.Header, grpcHeaderCompression)
cc.unmarshaler.envelopeReader.compressionPool = cc.compressionPools.Get(compression)
return nil
}
// connection based on compression, codec
type grpcHandlerConn struct {
spec Spec
peer Peer
web bool
bufferPool *bufferPool
protobuf Codec // for errors
marshaler grpcMarshaler
responseWriter http.ResponseWriter
responseHeader http.Header
responseTrailer http.Header
wroteToBody bool
request *http.Request
unmarshaler grpcUnmarshaler
}
func (hc *grpcHandlerConn) Spec() Spec {
return hc.spec
}
func (hc *grpcHandlerConn) Peer() Peer {
return hc.peer
}
// Receive delegated receive and unmarshal processes to unmarshaler
func (hc *grpcHandlerConn) Receive(msg interface{}) error {
if err := hc.unmarshaler.Unmarshal(msg); err != nil {
return err // already coded
}
return nil // must be a literal nil: nil *Error is a non-nil error
}
func (hc *grpcHandlerConn) RequestHeader() http.Header {
return hc.request.Header
}
func (hc *grpcHandlerConn) Send(msg interface{}) error {
defer flushResponseWriter(hc.responseWriter)
if !hc.wroteToBody {
mergeHeaders(hc.responseWriter.Header(), hc.responseHeader)
hc.wroteToBody = true
}
if err := hc.marshaler.Marshal(msg); err != nil {
return err
}
return nil // must be a literal nil: nil *Error is a non-nil error
}
func (hc *grpcHandlerConn) ResponseHeader() http.Header {
return hc.responseHeader
}
func (hc *grpcHandlerConn) ResponseTrailer() http.Header {
return hc.responseTrailer
}
func (hc *grpcHandlerConn) Close(err error) (retErr error) {
defer func() {
// We don't want to copy unread portions of the body to /dev/null here: if
// the client hasn't closed the request body, we'll block until the server
// timeout kicks in. This could happen because the client is malicious, but
// a well-intentioned client may just not expect the server to be returning
// an error for a streaming RPC. Better to accept that we can't always reuse
// TCP connections.
closeErr := hc.request.Body.Close()
if retErr == nil {
retErr = closeErr
}
}()
defer flushResponseWriter(hc.responseWriter)
// If we haven't written the headers yet, do so.
if !hc.wroteToBody {
mergeHeaders(hc.responseWriter.Header(), hc.responseHeader)
}
// gRPC always sends the error's code, message, details, and metadata as
// trailing metadata. The Triple protocol doesn't do this, so we don't want
// to mutate the trailers map that the user sees.
mergedTrailers := make(
http.Header,
len(hc.responseTrailer)+2, // always make space for status & message
)
mergeHeaders(mergedTrailers, hc.responseTrailer)
grpcErrorToTrailer(hc.bufferPool, mergedTrailers, hc.protobuf, err)
if hc.web && !hc.wroteToBody {
// We're using gRPC-Web and we haven't yet written to the body. Since we're
// not sending any response messages, the gRPC specification calls this a
// "trailers-only" response. Under those circumstances, the gRPC-Web spec
// says that implementations _may_ send trailing metadata as HTTP headers
// instead. Envoy is the canonical implementation of the gRPC-Web protocol,
// so we emulate Envoy's behavior and put the trailing metadata in the HTTP
// headers.
mergeHeaders(hc.responseWriter.Header(), mergedTrailers)
return nil
}
if hc.web {
// We're using gRPC-Web and we've already sent the headers, so we write
// trailing metadata to the HTTP body.
if err := hc.marshaler.MarshalWebTrailers(mergedTrailers); err != nil {
return err
}
return nil // must be a literal nil: nil *Error is a non-nil error
}
// We're using standard gRPC. Even if we haven't written to the body and
// we're sending a "trailers-only" response, we must send trailing metadata
// as HTTP trailers. (If we had frame-level control of the HTTP/2 layer, we
// could send trailers-only responses as a single HEADER frame and no DATA
// frames, but net/http doesn't expose APIs that low-level.)
if !hc.wroteToBody {
// This block works around a bug in x/net/http2. Until Go 1.20, trailers
// written using http.TrailerPrefix were only sent if either (1) there's
// data in the body, or (2) the innermost http.ResponseWriter is flushed.
// To ensure that we always send a valid gRPC response, even if the user
// has wrapped the response writer in net/http middleware that doesn't
// implement http.Flusher, we must pre-declare our HTTP trailers. We can
// remove this when Go 1.21 ships and we drop support for Go 1.19.
for key := range mergedTrailers {
addHeaderCanonical(hc.responseWriter.Header(), headerTrailer, key)
}
hc.responseWriter.WriteHeader(http.StatusOK)
for key, values := range mergedTrailers {
for _, value := range values {
// These are potentially user-supplied, so we can't assume they're in
// canonical form. Don't use addHeaderCanonical.
hc.responseWriter.Header().Add(key, value)
}
}
return nil
}
// In net/http's ResponseWriter API, we send HTTP trailers by writing to the
// headers map with a special prefix. This prefixing is an implementation
// detail, so we should hide it and _not_ mutate the user-visible headers.
//
// Note that this is _very_ finicky and difficult to test with net/http,
// since correctness depends on low-level framing details. Breaking this
// logic breaks Envoy's gRPC-Web translation.
for key, values := range mergedTrailers {
for _, value := range values {
// These are potentially user-supplied, so we can't assume they're in
// canonical form. Don't use addHeaderCanonical.
hc.responseWriter.Header().Add(http.TrailerPrefix+key, value)
}
}
return nil
}
type grpcMarshaler struct {
envelopeWriter
}
func (m *grpcMarshaler) MarshalWebTrailers(trailer http.Header) *Error {
raw := m.envelopeWriter.bufferPool.Get()
defer m.envelopeWriter.bufferPool.Put(raw)
for key, values := range trailer {
// Per the Go specification, keys inserted during iteration may be produced
// later in the iteration or may be skipped. For safety, avoid mutating the
// map if the key is already lower-cased.
lower := strings.ToLower(key)
if key == lower {
continue
}
delete(trailer, key)
trailer[lower] = values
}
if err := trailer.Write(raw); err != nil {
return errorf(CodeInternal, "format trailers: %w", err)
}
return m.Write(&envelope{
Data: raw,
Flags: grpcFlagEnvelopeTrailer,
})
}
type grpcUnmarshaler struct {
envelopeReader envelopeReader
webTrailer http.Header
}
func (u *grpcUnmarshaler) Unmarshal(message interface{}) *Error {
// delegate read packet and unmarshal processes to envelopeReader
err := u.envelopeReader.Unmarshal(message)
if err == nil {
return nil
}
if !errors.Is(err, errSpecialEnvelope) {
return err
}
// for special envelope
env := u.envelopeReader.last
// for non-web grpc, last envelope needs to set grpcFlagEnvelopeTrailer
if !env.IsSet(grpcFlagEnvelopeTrailer) {
return errorf(CodeInternal, "protocol error: invalid envelope flags %d", env.Flags)
}
// Per the gRPC-Web specification, trailers should be encoded as an HTTP/1
// headers block _without_ the terminating newline. To make the headers
// parseable by net/textproto, we need to add the newline.
if err := env.Data.WriteByte('\n'); err != nil {
return errorf(CodeInternal, "unmarshal web trailers: %w", err)
}
bufferedReader := bufio.NewReader(env.Data)
mimeReader := textproto.NewReader(bufferedReader)
mimeHeader, mimeErr := mimeReader.ReadMIMEHeader()
if mimeErr != nil {
return errorf(
CodeInternal,
"gRPC-Web protocol error: trailers invalid: %w",
mimeErr,
)
}
u.webTrailer = http.Header(mimeHeader)
return errSpecialEnvelope
}
func (u *grpcUnmarshaler) WebTrailer() http.Header {
return u.webTrailer
}
func grpcValidateResponse(
response *http.Response,
header, trailer http.Header,
availableCompressors readOnlyCompressionPools,
bufferPool *bufferPool,
protobuf Codec,
) *Error {
if response.StatusCode != http.StatusOK {
return errorf(grpcHTTPToCode(response.StatusCode), "HTTP status %v", response.Status)
}
if compression := getHeaderCanonical(response.Header, grpcHeaderCompression); compression != "" &&
compression != compressionIdentity &&
!availableCompressors.Contains(compression) {
// Per https://github.com/grpc/grpc/blob/master/doc/compression.md, we
// should return CodeInternal and specify acceptable compression(s) (in
// addition to setting the Grpc-Accept-Encoding header).
return errorf(
CodeInternal,
"unknown encoding %q: accepted encodings are %v",
compression,
availableCompressors.CommaSeparatedNames(),
)
}
// When there's no body, gRPC and gRPC-Web servers may send error information
// in the HTTP headers.
if err := grpcErrorFromTrailer(
bufferPool,
protobuf,
response.Header,
); err != nil && !errors.Is(err, errTrailersWithoutGRPCStatus) {
// Per the specification, only the HTTP status code and Content-Type should
// be treated as headers. The rest should be treated as trailing metadata.
if contentType := getHeaderCanonical(response.Header, headerContentType); contentType != "" {
setHeaderCanonical(header, headerContentType, contentType)
}
mergeHeaders(trailer, response.Header)
delHeaderCanonical(trailer, headerContentType)
// Also set the error metadata
err.meta = header.Clone()
mergeHeaders(err.meta, trailer)
return err
}
// The response is valid, so we should expose the headers.
mergeHeaders(header, response.Header)
return nil
}
func grpcHTTPToCode(httpCode int) Code {
// https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
// Note that this is not just the inverse of the gRPC-to-HTTP mapping.
switch httpCode {
case 400:
return CodeInternal
case 401:
return CodeUnauthenticated
case 403:
return CodePermissionDenied
case 404:
return CodeUnimplemented
case 429:
return CodeUnavailable
case 502, 503, 504:
return CodeUnavailable
default:
return CodeUnknown
}
}
// The gRPC wire protocol specifies that errors should be serialized using the
// binary Protobuf format, even if the messages in the request/response stream
// use a different codec. Consequently, this function needs a Protobuf codec to
// unmarshal error information in the headers.
func grpcErrorFromTrailer(bufferPool *bufferPool, protobuf Codec, trailer http.Header) *Error {
codeHeader := getHeaderCanonical(trailer, grpcHeaderStatus)
if codeHeader == "" {
return NewError(CodeInternal, errTrailersWithoutGRPCStatus)
}
if codeHeader == "0" {
return nil
}
code, err := strconv.ParseUint(codeHeader, 10 /* base */, 32 /* bitsize */)
if err != nil {
return errorf(CodeInternal, "gRPC protocol error: invalid error code %q", codeHeader)
}
message := grpcPercentDecode(bufferPool, getHeaderCanonical(trailer, grpcHeaderMessage))
retErr := NewWireError(Code(code), errors.New(message))
detailsBinaryEncoded := getHeaderCanonical(trailer, grpcHeaderDetails)
if len(detailsBinaryEncoded) > 0 {
detailsBinary, err := DecodeBinaryHeader(detailsBinaryEncoded)
if err != nil {
return errorf(CodeInternal, "server returned invalid grpc-status-details-bin trailer: %w", err)
}
var status statusv1.Status
if err := protobuf.Unmarshal(detailsBinary, &status); err != nil {
return errorf(CodeInternal, "server returned invalid protobuf for error details: %w", err)
}
for _, d := range status.Details {
retErr.details = append(retErr.details, &ErrorDetail{pb: d})
}
// Prefer the Protobuf-encoded data to the headers (grpc-go does this too).
retErr.code = Code(status.Code)
retErr.err = errors.New(status.Message)
}
return retErr
}
func grpcParseTimeout(timeout string) (time.Duration, error) {
if timeout == "" {
return 0, errNoTimeout
}
unit, ok := grpcTimeoutUnitLookup[timeout[len(timeout)-1]]
if !ok {
return 0, fmt.Errorf("gRPC protocol error: timeout %q has invalid unit", timeout)
}
num, err := strconv.ParseInt(timeout[:len(timeout)-1], 10 /* base */, 64 /* bitsize */)
if err != nil || num < 0 {
return 0, fmt.Errorf("gRPC protocol error: invalid timeout %q", timeout)
}
if num > 99999999 { // timeout must be ASCII string of at most 8 digits
return 0, fmt.Errorf("gRPC protocol error: timeout %q is too long", timeout)
}
if unit == time.Hour && num > grpcTimeoutMaxHours {
// Timeout is effectively unbounded, so ignore it. The grpc-go
// implementation does the same thing.
return 0, errNoTimeout
}
return time.Duration(num) * unit, nil
}
func grpcEncodeTimeout(timeout time.Duration) (string, error) {
if timeout <= 0 {
return "0n", nil
}
for _, pair := range grpcTimeoutUnits {
digits := strconv.FormatInt(int64(timeout/pair.size), 10 /* base */)
if len(digits) < grpcMaxTimeoutChars {
return digits + string(pair.char), nil
}
}
// The max time.Duration is smaller than the maximum expressible gRPC
// timeout, so we can't reach this case.
return "", errNoTimeout
}
func grpcCodecFromContentType(contentType string) string {
if contentType == grpcContentTypeDefault {
// implicitly protobuf
return codecNameProto
}
return strings.TrimPrefix(contentType, grpcContentTypePrefix)
}
func grpcContentTypeFromCodecName(name string) string {
return grpcContentTypePrefix + name
}
func grpcErrorToTrailer(bufferPool *bufferPool, trailer http.Header, protobuf Codec, err error) {
if err == nil {
setHeaderCanonical(trailer, grpcHeaderStatus, "0") // zero is the gRPC OK status
setHeaderCanonical(trailer, grpcHeaderMessage, "")
return
}
status := grpcStatusFromError(err)
code := strconv.Itoa(int(status.Code))
bin, binErr := protobuf.Marshal(status)
if binErr != nil {
setHeaderCanonical(
trailer,
grpcHeaderStatus,
strconv.FormatInt(int64(CodeInternal), 10 /* base */),
)
setHeaderCanonical(
trailer,
grpcHeaderMessage,
grpcPercentEncode(
bufferPool,
fmt.Sprintf("marshal protobuf status: %v", binErr),
),
)
return
}
if tripleErr, ok := asError(err); ok {
mergeHeaders(trailer, tripleErr.meta)
}
setHeaderCanonical(trailer, grpcHeaderStatus, code)
setHeaderCanonical(trailer, grpcHeaderMessage, grpcPercentEncode(bufferPool, status.Message))
setHeaderCanonical(trailer, grpcHeaderDetails, EncodeBinaryHeader(bin))
}
func grpcStatusFromError(err error) *statusv1.Status {
status := &statusv1.Status{
Code: int32(CodeUnknown),
Message: err.Error(),
}
if tripleErr, ok := asError(err); ok {
status.Code = int32(tripleErr.Code())
status.Message = tripleErr.Message()
status.Details = tripleErr.detailsAsAny()
}
return status
}
// grpcPercentEncode follows RFC 3986 Section 2.1 and the gRPC HTTP/2 spec.
// It's a variant of URL-encoding with fewer reserved characters. It's intended
// to take UTF-8 encoded text and escape non-ASCII bytes so that they're valid
// HTTP/1 headers, while still maximizing readability of the data on the wire.
//
// The grpc-message trailer (used for human-readable error messages) should be
// percent-encoded.
//
// References:
//
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#responses
// https://datatracker.ietf.org/doc/html/rfc3986#section-2.1
func grpcPercentEncode(bufferPool *bufferPool, msg string) string {
for i := 0; i < len(msg); i++ {
// Characters that need to be escaped are defined in gRPC's HTTP/2 spec.
// They're different from the generic set defined in RFC 3986.
if c := msg[i]; c < ' ' || c > '~' || c == '%' {
return grpcPercentEncodeSlow(bufferPool, msg, i)
}
}
return msg
}
// msg needs some percent-escaping. Bytes before offset don't require
// percent-encoding, so they can be copied to the output as-is.
func grpcPercentEncodeSlow(bufferPool *bufferPool, msg string, offset int) string {
out := bufferPool.Get()
defer bufferPool.Put(out)
out.WriteString(msg[:offset])
for i := offset; i < len(msg); i++ {
c := msg[i]
if c < ' ' || c > '~' || c == '%' {
out.WriteString(fmt.Sprintf("%%%02X", c))
continue
}
out.WriteByte(c)
}
return out.String()
}
func grpcPercentDecode(bufferPool *bufferPool, encoded string) string {
for i := 0; i < len(encoded); i++ {
if c := encoded[i]; c == '%' && i+2 < len(encoded) {
return grpcPercentDecodeSlow(bufferPool, encoded, i)
}
}
return encoded
}
// Similar to percentEncodeSlow: encoded is percent-encoded, and needs to be
// decoded byte-by-byte starting at offset.
func grpcPercentDecodeSlow(bufferPool *bufferPool, encoded string, offset int) string {
out := bufferPool.Get()
defer bufferPool.Put(out)
out.WriteString(encoded[:offset])
for i := offset; i < len(encoded); i++ {
c := encoded[i]
if c != '%' || i+2 >= len(encoded) {
out.WriteByte(c)
continue
}
parsed, err := strconv.ParseUint(encoded[i+1:i+3], 16 /* hex */, 8 /* bitsize */)
if err != nil {
out.WriteRune(utf8.RuneError)
} else {
out.WriteByte(byte(parsed))
}
i += 2
}
return out.String()
}