blob: 5cef7d1ae36c25e29e9ae7a202e5f65603c8de56 [file] [log] [blame]
// Copyright 2021-2023 Buf Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package triple_protocol
import (
"compress/gzip"
"context"
"io"
"net/http"
"time"
)
// A ClientOption configures a [Client].
//
// In addition to any options grouped in the documentation below, remember that
// any [Option] is also a valid ClientOption.
type ClientOption interface {
applyToClient(*clientConfig)
}
// WithAcceptCompression makes a compression algorithm available to a client.
// Clients ask servers to compress responses using any of the registered
// algorithms. The first registered algorithm is treated as the least
// preferred, and the last registered algorithm is the most preferred.
//
// It's safe to use this option liberally: servers will ignore any
// compression algorithms they don't support. To compress requests, pair this
// option with [WithSendCompression]. To remove support for a
// previously-registered compression algorithm, use WithAcceptCompression with
// nil decompressor and compressor constructors.
//
// Clients accept gzipped responses by default, using a compressor backed by the
// standard library's [gzip] package with the default compression level. Use
// [WithSendGzip] to compress requests with gzip.
//
// Calling WithAcceptCompression with an empty name is a no-op.
func WithAcceptCompression(
name string,
newDecompressor func() Decompressor,
newCompressor func() Compressor,
) ClientOption {
return &compressionOption{
Name: name,
CompressionPool: newCompressionPool(newDecompressor, newCompressor),
}
}
// WithClientOptions composes multiple ClientOptions into one.
func WithClientOptions(options ...ClientOption) ClientOption {
return &clientOptionsOption{options}
}
// WithTriple configures clients to use the Triple protocol.
func WithTriple() ClientOption {
return &tripleOption{}
}
// WithProtoJSON configures a client to send JSON-encoded data instead of
// binary Protobuf. It uses the standard Protobuf JSON mapping as implemented
// by [google.golang.org/protobuf/encoding/protojson]: fields are named using
// lowerCamelCase, zero values are omitted, missing required fields are errors,
// enums are emitted as strings, etc.
func WithProtoJSON() ClientOption {
return WithCodec(&protoJSONCodec{codecNameJSON})
}
// WithSendCompression configures the client to use the specified algorithm to
// compress request messages. If the algorithm has not been registered using
// [WithAcceptCompression], the client will return errors at runtime.
//
// Because some servers don't support compression, clients default to sending
// uncompressed requests.
func WithSendCompression(name string) ClientOption {
return &sendCompressionOption{Name: name}
}
// WithSendGzip configures the client to gzip requests. Since clients have
// access to a gzip compressor by default, WithSendGzip doesn't require
// [WithSendCompression].
//
// Some servers don't support gzip, so clients default to sending uncompressed
// requests.
func WithSendGzip() ClientOption {
return WithSendCompression(compressionGzip)
}
// WithTimeout configures the default timeout of client call including unary
// and stream. If you want to specify the timeout of a specific request, please
// use context.WithTimeout, then default timeout would be overridden.
func WithTimeout(timeout time.Duration) ClientOption {
return &timeoutOption{Timeout: timeout}
}
// A HandlerOption configures a [Handler].
//
// In addition to any options grouped in the documentation below, remember that
// any [Option] is also a HandlerOption.
type HandlerOption interface {
applyToHandler(*handlerConfig)
}
// WithCompression configures handlers to support a compression algorithm.
// Clients may send messages compressed with that algorithm and/or request
// compressed responses. The [Compressor] and [Decompressor] produced by the
// supplied constructors must use the same algorithm. Internally, Triple pools
// compressors and decompressors.
//
// By default, handlers support gzip using the standard library's
// [compress/gzip] package at the default compression level. To remove support for
// a previously-registered compression algorithm, use WithCompression with nil
// decompressor and compressor constructors.
//
// Calling WithCompression with an empty name is a no-op.
func WithCompression(
name string,
newDecompressor func() Decompressor,
newCompressor func() Compressor,
) HandlerOption {
return &compressionOption{
Name: name,
CompressionPool: newCompressionPool(newDecompressor, newCompressor),
}
}
// WithHandlerOptions composes multiple HandlerOptions into one.
func WithHandlerOptions(options ...HandlerOption) HandlerOption {
return &handlerOptionsOption{options}
}
// WithRecover adds an interceptor that recovers from panics. The supplied
// function receives the context, [Spec], request headers, and the recovered
// value (which may be nil). It must return an error to send back to the
// client. It may also log the panic, emit metrics, or execute other
// error-handling logic. Handler functions must be safe to call concurrently.
//
// To preserve compatibility with [net/http]'s semantics, this interceptor
// doesn't handle panics with [http.ErrAbortHandler].
//
// By default, handlers don't recover from panics. Because the standard
// library's [http.Server] recovers from panics by default, this option isn't
// usually necessary to prevent crashes. Instead, it helps servers collect
// RPC-specific data during panics and send a more detailed error to
// clients.
func WithRecover(handle func(context.Context, Spec, http.Header, interface{}) error) HandlerOption {
return WithInterceptors(&recoverHandlerInterceptor{handle: handle})
}
// WithRequireTripleProtocolHeader configures the Handler to require requests
// using the Triple RPC protocol to include the Triple-Protocol-Version
// header. This ensures that HTTP proxies and net/http middleware can easily
// identify valid Triple requests, even if they use a common Content-Type like
// application/json. However, it makes ad-hoc requests with tools like cURL
// more laborious.
//
// This option has no effect if the client uses the gRPC or gRPC-Web protocols.
func WithRequireTripleProtocolHeader() HandlerOption {
return &requireTripleProtocolHeaderOption{}
}
func WithGroup(group string) Option {
return &groupOption{group}
}
func WithVersion(version string) Option {
return &versionOption{version}
}
// Option implements both [ClientOption] and [HandlerOption], so it can be
// applied both client-side and server-side.
type Option interface {
ClientOption
HandlerOption
}
// WithCodec registers a serialization method with a client or handler.
// Handlers may have multiple codecs registered, and use whichever the client
// chooses. Clients may only have a single codec.
//
// By default, handlers and clients support binary Protocol Buffer data using
// [google.golang.org/protobuf/proto]. Handlers also support JSON by default,
// using the standard Protobuf JSON mapping. Users with more specialized needs
// may override the default codecs by registering a new codec under the "proto"
// or "json" names. When supplying a custom "proto" codec, keep in mind that
// some unexported, protocol-specific messages are serialized using Protobuf -
// take care to fall back to the standard Protobuf implementation if
// necessary.
//
// Registering a codec with an empty name is a no-op.
func WithCodec(codec Codec) Option {
return &codecOption{Codec: codec}
}
// WithCompressMinBytes sets a minimum size threshold for compression:
// regardless of compressor configuration, messages smaller than the configured
// minimum are sent uncompressed.
//
// The default minimum is zero. Setting a minimum compression threshold may
// improve overall performance, because the CPU cost of compressing very small
// messages usually isn't worth the small reduction in network I/O.
func WithCompressMinBytes(min int) Option {
return &compressMinBytesOption{Min: min}
}
// WithReadMaxBytes limits the performance impact of pathologically large
// messages sent by the other party. For handlers, WithReadMaxBytes limits the size
// of a message that the client can send. For clients, WithReadMaxBytes limits the
// size of a message that the server can respond with. Limits apply to each Protobuf
// message, not to the stream as a whole.
//
// Setting WithReadMaxBytes to zero allows any message size. Both clients and
// handlers default to allowing any request size.
//
// Handlers may also use [http.MaxBytesHandler] to limit the total size of the
// HTTP request stream (rather than the per-message size). Triple handles
// [http.MaxBytesError] specially, so clients still receive errors with the
// appropriate error code and informative messages.
func WithReadMaxBytes(max int) Option {
return &readMaxBytesOption{Max: max}
}
// WithSendMaxBytes prevents sending messages too large for the client/handler
// to handle without significant performance overhead. For handlers, WithSendMaxBytes
// limits the size of a message that the handler can respond with. For clients,
// WithSendMaxBytes limits the size of a message that the client can send. Limits
// apply to each message, not to the stream as a whole.
//
// Setting WithSendMaxBytes to zero allows any message size. Both clients and
// handlers default to allowing any message size.
func WithSendMaxBytes(max int) Option {
return &sendMaxBytesOption{Max: max}
}
// todo(DMwangnima): consider how to expose this functionality to users
// WithIdempotency declares the idempotency of the procedure. This can determine
// whether a procedure call can safely be retried, and may affect which request
// modalities are allowed for a given procedure call.
//
// In most cases, you should not need to manually set this. It is normally set
// by the code generator for your schema. For protobuf schemas, it can be set like this:
//
// rpc Ping(PingRequest) returns (PingResponse) {
// option idempotency_level = NO_SIDE_EFFECTS;
// }
func WithIdempotency(idempotencyLevel IdempotencyLevel) Option {
return &idempotencyOption{idempotencyLevel: idempotencyLevel}
}
// WithInterceptors configures a client or handler's interceptor stack. Repeated
// WithInterceptors options are applied in order, so
//
// WithInterceptors(A) + WithInterceptors(B, C) == WithInterceptors(A, B, C)
//
// Unary interceptors compose like an onion. The first interceptor provided is
// the outermost layer of the onion: it acts first on the context and request,
// and last on the response and error.
//
// Stream interceptors also behave like an onion: the first interceptor
// provided is the outermost wrapper for the [StreamingClientConn] or
// [StreamingHandlerConn]. It's the first to see sent messages and the last to
// see received messages.
//
// Applied to client and handler, WithInterceptors(A, B, ..., Y, Z) produces:
//
// client.Send() client.Receive()
// | ^
// v |
// A --- --- A
// B --- --- B
// : ... ... :
// Y --- --- Y
// Z --- --- Z
// | ^
// v |
// = = = = = = = = = = = = = = = =
// network
// = = = = = = = = = = = = = = = =
// | ^
// v |
// A --- --- A
// B --- --- B
// : ... ... :
// Y --- --- Y
// Z --- --- Z
// | ^
// v |
// handler.Receive() handler.Send()
// | ^
// | |
// '-> handler logic >-'
//
// Note that in clients, Send handles the request message(s) and Receive
// handles the response message(s). For handlers, it's the reverse. Depending
// on your interceptor's logic, you may need to wrap one method in clients and
// the other in handlers.
func WithInterceptors(interceptors ...Interceptor) Option {
return &interceptorsOption{interceptors}
}
// WithOptions composes multiple Options into one.
func WithOptions(options ...Option) Option {
return &optionsOption{options}
}
type clientOptionsOption struct {
options []ClientOption
}
func (o *clientOptionsOption) applyToClient(config *clientConfig) {
for _, option := range o.options {
option.applyToClient(config)
}
}
type codecOption struct {
Codec Codec
}
func (o *codecOption) applyToClient(config *clientConfig) {
if o.Codec == nil || o.Codec.Name() == "" {
return
}
config.Codec = o.Codec
}
func (o *codecOption) applyToHandler(config *handlerConfig) {
if o.Codec == nil || o.Codec.Name() == "" {
return
}
config.Codecs[o.Codec.Name()] = o.Codec
}
type compressionOption struct {
Name string
CompressionPool *compressionPool
}
func (o *compressionOption) applyToClient(config *clientConfig) {
o.apply(&config.CompressionNames, config.CompressionPools)
}
func (o *compressionOption) applyToHandler(config *handlerConfig) {
o.apply(&config.CompressionNames, config.CompressionPools)
}
func (o *compressionOption) apply(configuredNames *[]string, configuredPools map[string]*compressionPool) {
if o.Name == "" {
return
}
if o.CompressionPool == nil {
delete(configuredPools, o.Name)
var names []string
for _, name := range *configuredNames {
if name == o.Name {
continue
}
names = append(names, name)
}
*configuredNames = names
return
}
configuredPools[o.Name] = o.CompressionPool
*configuredNames = append(*configuredNames, o.Name)
}
type compressMinBytesOption struct {
Min int
}
func (o *compressMinBytesOption) applyToClient(config *clientConfig) {
config.CompressMinBytes = o.Min
}
func (o *compressMinBytesOption) applyToHandler(config *handlerConfig) {
config.CompressMinBytes = o.Min
}
type readMaxBytesOption struct {
Max int
}
func (o *readMaxBytesOption) applyToClient(config *clientConfig) {
config.ReadMaxBytes = o.Max
}
func (o *readMaxBytesOption) applyToHandler(config *handlerConfig) {
config.ReadMaxBytes = o.Max
}
type sendMaxBytesOption struct {
Max int
}
func (o *sendMaxBytesOption) applyToClient(config *clientConfig) {
config.SendMaxBytes = o.Max
}
func (o *sendMaxBytesOption) applyToHandler(config *handlerConfig) {
config.SendMaxBytes = o.Max
}
type handlerOptionsOption struct {
options []HandlerOption
}
func (o *handlerOptionsOption) applyToHandler(config *handlerConfig) {
for _, option := range o.options {
option.applyToHandler(config)
}
}
type requireTripleProtocolHeaderOption struct{}
func (o *requireTripleProtocolHeaderOption) applyToHandler(config *handlerConfig) {
config.RequireTripleProtocolHeader = true
}
type groupOption struct {
Group string
}
func (o *groupOption) applyToClient(config *clientConfig) {
config.Group = o.Group
}
func (o *groupOption) applyToHandler(config *handlerConfig) {
config.Group = o.Group
}
type versionOption struct {
Version string
}
func (o *versionOption) applyToClient(config *clientConfig) {
config.Version = o.Version
}
func (o *versionOption) applyToHandler(config *handlerConfig) {
config.Version = o.Version
}
type idempotencyOption struct {
idempotencyLevel IdempotencyLevel
}
func (o *idempotencyOption) applyToClient(config *clientConfig) {
config.IdempotencyLevel = o.idempotencyLevel
}
func (o *idempotencyOption) applyToHandler(config *handlerConfig) {
config.IdempotencyLevel = o.idempotencyLevel
}
type tripleOption struct{}
func (o *tripleOption) applyToClient(config *clientConfig) {
config.Protocol = &protocolTriple{}
}
type interceptorsOption struct {
Interceptors []Interceptor
}
func (o *interceptorsOption) applyToClient(config *clientConfig) {
config.Interceptor = o.chainWith(config.Interceptor)
}
func (o *interceptorsOption) applyToHandler(config *handlerConfig) {
config.Interceptor = o.chainWith(config.Interceptor)
}
func (o *interceptorsOption) chainWith(current Interceptor) Interceptor {
if len(o.Interceptors) == 0 {
return current
}
if current == nil && len(o.Interceptors) == 1 {
return o.Interceptors[0]
}
if current == nil && len(o.Interceptors) > 1 {
return newChain(o.Interceptors)
}
return newChain(append([]Interceptor{current}, o.Interceptors...))
}
type optionsOption struct {
options []Option
}
func (o *optionsOption) applyToClient(config *clientConfig) {
for _, option := range o.options {
option.applyToClient(config)
}
}
func (o *optionsOption) applyToHandler(config *handlerConfig) {
for _, option := range o.options {
option.applyToHandler(config)
}
}
type sendCompressionOption struct {
Name string
}
func (o *sendCompressionOption) applyToClient(config *clientConfig) {
config.RequestCompressionName = o.Name
}
type timeoutOption struct {
Timeout time.Duration
}
func (o *timeoutOption) applyToClient(config *clientConfig) {
config.Timeout = o.Timeout
}
func withGzip() Option {
return &compressionOption{
Name: compressionGzip,
CompressionPool: newCompressionPool(
func() Decompressor { return &gzip.Reader{} },
func() Compressor { return gzip.NewWriter(io.Discard) },
),
}
}
func withProtoBinaryCodec() Option {
return WithCodec(&protoBinaryCodec{})
}
func withProtoJSONCodecs() HandlerOption {
return WithHandlerOptions(
WithCodec(&protoJSONCodec{codecNameJSON}),
WithCodec(&protoJSONCodec{codecNameJSONCharsetUTF8}),
)
}