blob: f2af36a3c99404007e7f5ebdfc1efd6d527dc8f0 [file] [log] [blame]
// Copyright 2021-2023 Buf Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package triple_protocol
import (
"errors"
"io"
"net/http"
)
// ClientStreamForClient is the client's view of a client streaming RPC.
//
// It's returned from [Client].CallClientStream, but doesn't currently have an
// exported constructor function.
type ClientStreamForClient struct {
conn StreamingClientConn
// Error from client construction. If non-nil, return for all calls.
err error
}
// Spec returns the specification for the RPC.
func (c *ClientStreamForClient) Spec() Spec {
return c.conn.Spec()
}
// Peer describes the server for the RPC.
func (c *ClientStreamForClient) Peer() Peer {
return c.conn.Peer()
}
// RequestHeader returns the request headers. Headers are sent to the server with the
// first call to Send.
//
// Headers beginning with "Triple-" and "Grpc-" are reserved for use by the
// Triple and gRPC protocols. Applications shouldn't write them.
func (c *ClientStreamForClient) RequestHeader() http.Header {
if c.err != nil {
return http.Header{}
}
return c.conn.RequestHeader()
}
// Send a message to the server. The first call to Send also sends the request
// headers.
//
// If the server returns an error, Send returns an error that wraps [io.EOF].
// Clients should check for case using the standard library's [errors.Is] and
// unmarshal the error using CloseAndReceive.
func (c *ClientStreamForClient) Send(request any) error {
if c.err != nil {
return c.err
}
if request == nil {
return c.conn.Send(nil)
}
return c.conn.Send(request)
}
// CloseAndReceive closes the send side of the stream and waits for the
// response.
func (c *ClientStreamForClient) CloseAndReceive(response *Response) error {
if c.err != nil {
return c.err
}
if err := c.conn.CloseRequest(); err != nil {
_ = c.conn.CloseResponse()
return err
}
if err := receiveUnaryResponse(c.conn, response); err != nil {
_ = c.conn.CloseResponse()
return err
}
return c.conn.CloseResponse()
}
// Conn exposes the underlying StreamingClientConn. This may be useful if
// you'd prefer to wrap the connection in a different high-level API.
func (c *ClientStreamForClient) Conn() (StreamingClientConn, error) {
return c.conn, c.err
}
// ServerStreamForClient is the client's view of a server streaming RPC.
//
// It's returned from [Client].CallServerStream, but doesn't currently have an
// exported constructor function.
type ServerStreamForClient struct {
conn StreamingClientConn
msg any
// Error from client construction. If non-nil, return for all calls.
constructErr error
// Error from conn.Receive().
receiveErr error
}
// Receive advances the stream to the next message, which will then be
// available through the Msg method. It returns false when the stream stops,
// either by reaching the end or by encountering an unexpected error. After
// Receive returns false, the Err method will return any unexpected error
// encountered.
func (s *ServerStreamForClient) Receive(msg any) bool {
if s.constructErr != nil || s.receiveErr != nil {
return false
}
s.msg = msg
s.receiveErr = s.conn.Receive(s.msg)
return s.receiveErr == nil
}
// Msg returns the most recent message unmarshaled by a call to Receive.
func (s *ServerStreamForClient) Msg() any {
// todo:// processing nil pointer
//if s.msg == nil {
// s.msg = new(Res)
//}
return s.msg
}
// Err returns the first non-EOF error that was encountered by Receive.
func (s *ServerStreamForClient) Err() error {
if s.constructErr != nil {
return s.constructErr
}
if s.receiveErr != nil && !errors.Is(s.receiveErr, io.EOF) {
return s.receiveErr
}
return nil
}
// ResponseHeader returns the headers received from the server. It blocks until
// the first call to Receive returns.
func (s *ServerStreamForClient) ResponseHeader() http.Header {
if s.constructErr != nil {
return http.Header{}
}
return s.conn.ResponseHeader()
}
// ResponseTrailer returns the trailers received from the server. Trailers
// aren't fully populated until Receive() returns an error wrapping io.EOF.
func (s *ServerStreamForClient) ResponseTrailer() http.Header {
if s.constructErr != nil {
return http.Header{}
}
return s.conn.ResponseTrailer()
}
// Close the receive side of the stream.
func (s *ServerStreamForClient) Close() error {
if s.constructErr != nil {
return s.constructErr
}
return s.conn.CloseResponse()
}
// Conn exposes the underlying StreamingClientConn. This may be useful if
// you'd prefer to wrap the connection in a different high-level API.
func (s *ServerStreamForClient) Conn() (StreamingClientConn, error) {
return s.conn, s.constructErr
}
// BidiStreamForClient is the client's view of a bidirectional streaming RPC.
//
// It's returned from [Client].CallBidiStream, but doesn't currently have an
// exported constructor function.
type BidiStreamForClient struct {
conn StreamingClientConn
// Error from client construction. If non-nil, return for all calls.
err error
}
// Spec returns the specification for the RPC.
func (b *BidiStreamForClient) Spec() Spec {
return b.conn.Spec()
}
// Peer describes the server for the RPC.
func (b *BidiStreamForClient) Peer() Peer {
return b.conn.Peer()
}
// RequestHeader returns the request headers. Headers are sent with the first
// call to Send.
//
// Headers beginning with "Triple-" and "Grpc-" are reserved for use by the
// Triple and gRPC protocols. Applications shouldn't write them.
func (b *BidiStreamForClient) RequestHeader() http.Header {
if b.err != nil {
return http.Header{}
}
return b.conn.RequestHeader()
}
// Send a message to the server. The first call to Send also sends the request
// headers. To send just the request headers, without a body, call Send with a
// nil pointer.
//
// If the server returns an error, Send returns an error that wraps [io.EOF].
// Clients should check for EOF using the standard library's [errors.Is] and
// call Receive to retrieve the error.
func (b *BidiStreamForClient) Send(msg any) error {
if b.err != nil {
return b.err
}
if msg == nil {
return b.conn.Send(nil)
}
return b.conn.Send(msg)
}
// CloseRequest closes the send side of the stream.
func (b *BidiStreamForClient) CloseRequest() error {
if b.err != nil {
return b.err
}
return b.conn.CloseRequest()
}
// Receive a message. When the server is done sending messages and no other
// errors have occurred, Receive will return an error that wraps [io.EOF].
func (b *BidiStreamForClient) Receive(msg any) error {
if b.err != nil {
return b.err
}
if err := b.conn.Receive(&msg); err != nil {
return err
}
return nil
}
// CloseResponse closes the receive side of the stream.
func (b *BidiStreamForClient) CloseResponse() error {
if b.err != nil {
return b.err
}
return b.conn.CloseResponse()
}
// ResponseHeader returns the headers received from the server. It blocks until
// the first call to Receive returns.
func (b *BidiStreamForClient) ResponseHeader() http.Header {
if b.err != nil {
return http.Header{}
}
return b.conn.ResponseHeader()
}
// ResponseTrailer returns the trailers received from the server. Trailers
// aren't fully populated until Receive() returns an error wrapping [io.EOF].
func (b *BidiStreamForClient) ResponseTrailer() http.Header {
if b.err != nil {
return http.Header{}
}
return b.conn.ResponseTrailer()
}
// Conn exposes the underlying StreamingClientConn. This may be useful if
// you'd prefer to wrap the connection in a different high-level API.
func (b *BidiStreamForClient) Conn() (StreamingClientConn, error) {
return b.conn, b.err
}