| // Copyright 2021-2023 Buf Technologies, Inc. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package triple_protocol |
| |
| import ( |
| "compress/gzip" |
| "context" |
| "io" |
| "net/http" |
| "time" |
| ) |
| |
| // A ClientOption configures a [Client]. |
| // |
| // In addition to any options grouped in the documentation below, remember that |
| // any [Option] is also a valid ClientOption. |
| type ClientOption interface { |
| applyToClient(*clientConfig) |
| } |
| |
| // WithAcceptCompression makes a compression algorithm available to a client. |
| // Clients ask servers to compress responses using any of the registered |
| // algorithms. The first registered algorithm is treated as the least |
| // preferred, and the last registered algorithm is the most preferred. |
| // |
| // It's safe to use this option liberally: servers will ignore any |
| // compression algorithms they don't support. To compress requests, pair this |
| // option with [WithSendCompression]. To remove support for a |
| // previously-registered compression algorithm, use WithAcceptCompression with |
| // nil decompressor and compressor constructors. |
| // |
| // Clients accept gzipped responses by default, using a compressor backed by the |
| // standard library's [gzip] package with the default compression level. Use |
| // [WithSendGzip] to compress requests with gzip. |
| // |
| // Calling WithAcceptCompression with an empty name is a no-op. |
| func WithAcceptCompression( |
| name string, |
| newDecompressor func() Decompressor, |
| newCompressor func() Compressor, |
| ) ClientOption { |
| return &compressionOption{ |
| Name: name, |
| CompressionPool: newCompressionPool(newDecompressor, newCompressor), |
| } |
| } |
| |
| // WithClientOptions composes multiple ClientOptions into one. |
| func WithClientOptions(options ...ClientOption) ClientOption { |
| return &clientOptionsOption{options} |
| } |
| |
| // WithTriple configures clients to use the Triple protocol. |
| func WithTriple() ClientOption { |
| return &tripleOption{} |
| } |
| |
| // WithProtoJSON configures a client to send JSON-encoded data instead of |
| // binary Protobuf. It uses the standard Protobuf JSON mapping as implemented |
| // by [google.golang.org/protobuf/encoding/protojson]: fields are named using |
| // lowerCamelCase, zero values are omitted, missing required fields are errors, |
| // enums are emitted as strings, etc. |
| func WithProtoJSON() ClientOption { |
| return WithCodec(&protoJSONCodec{codecNameJSON}) |
| } |
| |
| // WithSendCompression configures the client to use the specified algorithm to |
| // compress request messages. If the algorithm has not been registered using |
| // [WithAcceptCompression], the client will return errors at runtime. |
| // |
| // Because some servers don't support compression, clients default to sending |
| // uncompressed requests. |
| func WithSendCompression(name string) ClientOption { |
| return &sendCompressionOption{Name: name} |
| } |
| |
| // WithSendGzip configures the client to gzip requests. Since clients have |
| // access to a gzip compressor by default, WithSendGzip doesn't require |
| // [WithSendCompression]. |
| // |
| // Some servers don't support gzip, so clients default to sending uncompressed |
| // requests. |
| func WithSendGzip() ClientOption { |
| return WithSendCompression(compressionGzip) |
| } |
| |
| // WithTimeout configures the default timeout of client call including unary |
| // and stream. If you want to specify the timeout of a specific request, please |
| // use context.WithTimeout, then default timeout would be overridden. |
| func WithTimeout(timeout time.Duration) ClientOption { |
| return &timeoutOption{Timeout: timeout} |
| } |
| |
| // A HandlerOption configures a [Handler]. |
| // |
| // In addition to any options grouped in the documentation below, remember that |
| // any [Option] is also a HandlerOption. |
| type HandlerOption interface { |
| applyToHandler(*handlerConfig) |
| } |
| |
| // WithCompression configures handlers to support a compression algorithm. |
| // Clients may send messages compressed with that algorithm and/or request |
| // compressed responses. The [Compressor] and [Decompressor] produced by the |
| // supplied constructors must use the same algorithm. Internally, Triple pools |
| // compressors and decompressors. |
| // |
| // By default, handlers support gzip using the standard library's |
| // [compress/gzip] package at the default compression level. To remove support for |
| // a previously-registered compression algorithm, use WithCompression with nil |
| // decompressor and compressor constructors. |
| // |
| // Calling WithCompression with an empty name is a no-op. |
| func WithCompression( |
| name string, |
| newDecompressor func() Decompressor, |
| newCompressor func() Compressor, |
| ) HandlerOption { |
| return &compressionOption{ |
| Name: name, |
| CompressionPool: newCompressionPool(newDecompressor, newCompressor), |
| } |
| } |
| |
| // WithHandlerOptions composes multiple HandlerOptions into one. |
| func WithHandlerOptions(options ...HandlerOption) HandlerOption { |
| return &handlerOptionsOption{options} |
| } |
| |
| // WithRecover adds an interceptor that recovers from panics. The supplied |
| // function receives the context, [Spec], request headers, and the recovered |
| // value (which may be nil). It must return an error to send back to the |
| // client. It may also log the panic, emit metrics, or execute other |
| // error-handling logic. Handler functions must be safe to call concurrently. |
| // |
| // To preserve compatibility with [net/http]'s semantics, this interceptor |
| // doesn't handle panics with [http.ErrAbortHandler]. |
| // |
| // By default, handlers don't recover from panics. Because the standard |
| // library's [http.Server] recovers from panics by default, this option isn't |
| // usually necessary to prevent crashes. Instead, it helps servers collect |
| // RPC-specific data during panics and send a more detailed error to |
| // clients. |
| func WithRecover(handle func(context.Context, Spec, http.Header, interface{}) error) HandlerOption { |
| return WithInterceptors(&recoverHandlerInterceptor{handle: handle}) |
| } |
| |
| // WithRequireTripleProtocolHeader configures the Handler to require requests |
| // using the Triple RPC protocol to include the Triple-Protocol-Version |
| // header. This ensures that HTTP proxies and net/http middleware can easily |
| // identify valid Triple requests, even if they use a common Content-Type like |
| // application/json. However, it makes ad-hoc requests with tools like cURL |
| // more laborious. |
| // |
| // This option has no effect if the client uses the gRPC or gRPC-Web protocols. |
| func WithRequireTripleProtocolHeader() HandlerOption { |
| return &requireTripleProtocolHeaderOption{} |
| } |
| |
| // Option implements both [ClientOption] and [HandlerOption], so it can be |
| // applied both client-side and server-side. |
| type Option interface { |
| ClientOption |
| HandlerOption |
| } |
| |
| // WithCodec registers a serialization method with a client or handler. |
| // Handlers may have multiple codecs registered, and use whichever the client |
| // chooses. Clients may only have a single codec. |
| // |
| // By default, handlers and clients support binary Protocol Buffer data using |
| // [google.golang.org/protobuf/proto]. Handlers also support JSON by default, |
| // using the standard Protobuf JSON mapping. Users with more specialized needs |
| // may override the default codecs by registering a new codec under the "proto" |
| // or "json" names. When supplying a custom "proto" codec, keep in mind that |
| // some unexported, protocol-specific messages are serialized using Protobuf - |
| // take care to fall back to the standard Protobuf implementation if |
| // necessary. |
| // |
| // Registering a codec with an empty name is a no-op. |
| func WithCodec(codec Codec) Option { |
| return &codecOption{Codec: codec} |
| } |
| |
| // WithCompressMinBytes sets a minimum size threshold for compression: |
| // regardless of compressor configuration, messages smaller than the configured |
| // minimum are sent uncompressed. |
| // |
| // The default minimum is zero. Setting a minimum compression threshold may |
| // improve overall performance, because the CPU cost of compressing very small |
| // messages usually isn't worth the small reduction in network I/O. |
| func WithCompressMinBytes(min int) Option { |
| return &compressMinBytesOption{Min: min} |
| } |
| |
| // WithReadMaxBytes limits the performance impact of pathologically large |
| // messages sent by the other party. For handlers, WithReadMaxBytes limits the size |
| // of a message that the client can send. For clients, WithReadMaxBytes limits the |
| // size of a message that the server can respond with. Limits apply to each Protobuf |
| // message, not to the stream as a whole. |
| // |
| // Setting WithReadMaxBytes to zero allows any message size. Both clients and |
| // handlers default to allowing any request size. |
| // |
| // Handlers may also use [http.MaxBytesHandler] to limit the total size of the |
| // HTTP request stream (rather than the per-message size). Triple handles |
| // [http.MaxBytesError] specially, so clients still receive errors with the |
| // appropriate error code and informative messages. |
| func WithReadMaxBytes(max int) Option { |
| return &readMaxBytesOption{Max: max} |
| } |
| |
| // WithSendMaxBytes prevents sending messages too large for the client/handler |
| // to handle without significant performance overhead. For handlers, WithSendMaxBytes |
| // limits the size of a message that the handler can respond with. For clients, |
| // WithSendMaxBytes limits the size of a message that the client can send. Limits |
| // apply to each message, not to the stream as a whole. |
| // |
| // Setting WithSendMaxBytes to zero allows any message size. Both clients and |
| // handlers default to allowing any message size. |
| func WithSendMaxBytes(max int) Option { |
| return &sendMaxBytesOption{Max: max} |
| } |
| |
| // todo(DMwangnima): consider how to expose this functionality to users |
| // WithIdempotency declares the idempotency of the procedure. This can determine |
| // whether a procedure call can safely be retried, and may affect which request |
| // modalities are allowed for a given procedure call. |
| // |
| // In most cases, you should not need to manually set this. It is normally set |
| // by the code generator for your schema. For protobuf schemas, it can be set like this: |
| // |
| // rpc Ping(PingRequest) returns (PingResponse) { |
| // option idempotency_level = NO_SIDE_EFFECTS; |
| // } |
| func WithIdempotency(idempotencyLevel IdempotencyLevel) Option { |
| return &idempotencyOption{idempotencyLevel: idempotencyLevel} |
| } |
| |
| // WithInterceptors configures a client or handler's interceptor stack. Repeated |
| // WithInterceptors options are applied in order, so |
| // |
| // WithInterceptors(A) + WithInterceptors(B, C) == WithInterceptors(A, B, C) |
| // |
| // Unary interceptors compose like an onion. The first interceptor provided is |
| // the outermost layer of the onion: it acts first on the context and request, |
| // and last on the response and error. |
| // |
| // Stream interceptors also behave like an onion: the first interceptor |
| // provided is the outermost wrapper for the [StreamingClientConn] or |
| // [StreamingHandlerConn]. It's the first to see sent messages and the last to |
| // see received messages. |
| // |
| // Applied to client and handler, WithInterceptors(A, B, ..., Y, Z) produces: |
| // |
| // client.Send() client.Receive() |
| // | ^ |
| // v | |
| // A --- --- A |
| // B --- --- B |
| // : ... ... : |
| // Y --- --- Y |
| // Z --- --- Z |
| // | ^ |
| // v | |
| // = = = = = = = = = = = = = = = = |
| // network |
| // = = = = = = = = = = = = = = = = |
| // | ^ |
| // v | |
| // A --- --- A |
| // B --- --- B |
| // : ... ... : |
| // Y --- --- Y |
| // Z --- --- Z |
| // | ^ |
| // v | |
| // handler.Receive() handler.Send() |
| // | ^ |
| // | | |
| // '-> handler logic >-' |
| // |
| // Note that in clients, Send handles the request message(s) and Receive |
| // handles the response message(s). For handlers, it's the reverse. Depending |
| // on your interceptor's logic, you may need to wrap one method in clients and |
| // the other in handlers. |
| func WithInterceptors(interceptors ...Interceptor) Option { |
| return &interceptorsOption{interceptors} |
| } |
| |
| // WithOptions composes multiple Options into one. |
| func WithOptions(options ...Option) Option { |
| return &optionsOption{options} |
| } |
| |
| type clientOptionsOption struct { |
| options []ClientOption |
| } |
| |
| func (o *clientOptionsOption) applyToClient(config *clientConfig) { |
| for _, option := range o.options { |
| option.applyToClient(config) |
| } |
| } |
| |
| type codecOption struct { |
| Codec Codec |
| } |
| |
| func (o *codecOption) applyToClient(config *clientConfig) { |
| if o.Codec == nil || o.Codec.Name() == "" { |
| return |
| } |
| config.Codec = o.Codec |
| } |
| |
| func (o *codecOption) applyToHandler(config *handlerConfig) { |
| if o.Codec == nil || o.Codec.Name() == "" { |
| return |
| } |
| config.Codecs[o.Codec.Name()] = o.Codec |
| } |
| |
| type compressionOption struct { |
| Name string |
| CompressionPool *compressionPool |
| } |
| |
| func (o *compressionOption) applyToClient(config *clientConfig) { |
| o.apply(&config.CompressionNames, config.CompressionPools) |
| } |
| |
| func (o *compressionOption) applyToHandler(config *handlerConfig) { |
| o.apply(&config.CompressionNames, config.CompressionPools) |
| } |
| |
| func (o *compressionOption) apply(configuredNames *[]string, configuredPools map[string]*compressionPool) { |
| if o.Name == "" { |
| return |
| } |
| if o.CompressionPool == nil { |
| delete(configuredPools, o.Name) |
| var names []string |
| for _, name := range *configuredNames { |
| if name == o.Name { |
| continue |
| } |
| names = append(names, name) |
| } |
| *configuredNames = names |
| return |
| } |
| configuredPools[o.Name] = o.CompressionPool |
| *configuredNames = append(*configuredNames, o.Name) |
| } |
| |
| type compressMinBytesOption struct { |
| Min int |
| } |
| |
| func (o *compressMinBytesOption) applyToClient(config *clientConfig) { |
| config.CompressMinBytes = o.Min |
| } |
| |
| func (o *compressMinBytesOption) applyToHandler(config *handlerConfig) { |
| config.CompressMinBytes = o.Min |
| } |
| |
| type readMaxBytesOption struct { |
| Max int |
| } |
| |
| func (o *readMaxBytesOption) applyToClient(config *clientConfig) { |
| config.ReadMaxBytes = o.Max |
| } |
| |
| func (o *readMaxBytesOption) applyToHandler(config *handlerConfig) { |
| config.ReadMaxBytes = o.Max |
| } |
| |
| type sendMaxBytesOption struct { |
| Max int |
| } |
| |
| func (o *sendMaxBytesOption) applyToClient(config *clientConfig) { |
| config.SendMaxBytes = o.Max |
| } |
| |
| func (o *sendMaxBytesOption) applyToHandler(config *handlerConfig) { |
| config.SendMaxBytes = o.Max |
| } |
| |
| type handlerOptionsOption struct { |
| options []HandlerOption |
| } |
| |
| func (o *handlerOptionsOption) applyToHandler(config *handlerConfig) { |
| for _, option := range o.options { |
| option.applyToHandler(config) |
| } |
| } |
| |
| type requireTripleProtocolHeaderOption struct{} |
| |
| func (o *requireTripleProtocolHeaderOption) applyToHandler(config *handlerConfig) { |
| config.RequireTripleProtocolHeader = true |
| } |
| |
| type idempotencyOption struct { |
| idempotencyLevel IdempotencyLevel |
| } |
| |
| func (o *idempotencyOption) applyToClient(config *clientConfig) { |
| config.IdempotencyLevel = o.idempotencyLevel |
| } |
| |
| func (o *idempotencyOption) applyToHandler(config *handlerConfig) { |
| config.IdempotencyLevel = o.idempotencyLevel |
| } |
| |
| type tripleOption struct{} |
| |
| func (o *tripleOption) applyToClient(config *clientConfig) { |
| config.Protocol = &protocolTriple{} |
| } |
| |
| type interceptorsOption struct { |
| Interceptors []Interceptor |
| } |
| |
| func (o *interceptorsOption) applyToClient(config *clientConfig) { |
| config.Interceptor = o.chainWith(config.Interceptor) |
| } |
| |
| func (o *interceptorsOption) applyToHandler(config *handlerConfig) { |
| config.Interceptor = o.chainWith(config.Interceptor) |
| } |
| |
| func (o *interceptorsOption) chainWith(current Interceptor) Interceptor { |
| if len(o.Interceptors) == 0 { |
| return current |
| } |
| if current == nil && len(o.Interceptors) == 1 { |
| return o.Interceptors[0] |
| } |
| if current == nil && len(o.Interceptors) > 1 { |
| return newChain(o.Interceptors) |
| } |
| return newChain(append([]Interceptor{current}, o.Interceptors...)) |
| } |
| |
| type optionsOption struct { |
| options []Option |
| } |
| |
| func (o *optionsOption) applyToClient(config *clientConfig) { |
| for _, option := range o.options { |
| option.applyToClient(config) |
| } |
| } |
| |
| func (o *optionsOption) applyToHandler(config *handlerConfig) { |
| for _, option := range o.options { |
| option.applyToHandler(config) |
| } |
| } |
| |
| type sendCompressionOption struct { |
| Name string |
| } |
| |
| func (o *sendCompressionOption) applyToClient(config *clientConfig) { |
| config.RequestCompressionName = o.Name |
| } |
| |
| type timeoutOption struct { |
| Timeout time.Duration |
| } |
| |
| func (o *timeoutOption) applyToClient(config *clientConfig) { |
| config.Timeout = o.Timeout |
| } |
| |
| func withGzip() Option { |
| return &compressionOption{ |
| Name: compressionGzip, |
| CompressionPool: newCompressionPool( |
| func() Decompressor { return &gzip.Reader{} }, |
| func() Compressor { return gzip.NewWriter(io.Discard) }, |
| ), |
| } |
| } |
| |
| func withProtoBinaryCodec() Option { |
| return WithCodec(&protoBinaryCodec{}) |
| } |
| |
| func withProtoJSONCodecs() HandlerOption { |
| return WithHandlerOptions( |
| WithCodec(&protoJSONCodec{codecNameJSON}), |
| WithCodec(&protoJSONCodec{codecNameJSONCharsetUTF8}), |
| ) |
| } |