blob: 6a0fbb347f8e29615c3f514b9c22bce387064bab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package triple_protocol
import (
"context"
)
import (
"github.com/dubbogo/grpc-go"
"github.com/dubbogo/grpc-go/metadata"
)
type compatHandlerStream struct {
ctx context.Context
conn StreamingHandlerConn
}
func (c *compatHandlerStream) SetHeader(md metadata.MD) error {
// todo(DMwangnima): add header method for streaming
return nil
}
func (c *compatHandlerStream) SendHeader(md metadata.MD) error {
// todo(DMwangnima): add header method for streaming
return nil
}
func (c *compatHandlerStream) SetTrailer(md metadata.MD) {
// todo(DMwangnima): add trailer method for streaming
return
}
func (c *compatHandlerStream) Context() context.Context {
return c.ctx
}
func (c *compatHandlerStream) SetContext(ctx context.Context) {
c.ctx = ctx
}
func (c *compatHandlerStream) SendMsg(m interface{}) error {
return c.conn.Send(m)
}
func (c *compatHandlerStream) RecvMsg(m interface{}) error {
return c.conn.Receive(m)
}
func NewCompatStreamHandler(
procedure string,
srv interface{},
typ StreamType,
streamFunc func(srv interface{}, stream grpc.ServerStream) error,
options ...HandlerOption,
) *Handler {
config := newHandlerConfig(procedure, options)
implementation := generateCompatStreamHandlerFunc(procedure, srv, streamFunc, config.Interceptor)
protocolHandlers := config.newProtocolHandlers(typ)
hdl := &Handler{
spec: config.newSpec(typ),
implementations: make(map[string]StreamingHandlerFunc, defaultImplementationsSize),
protocolHandlers: protocolHandlers,
allowMethod: sortedAllowMethodValue(protocolHandlers),
acceptPost: sortedAcceptPostValue(protocolHandlers),
}
hdl.processImplementation(getIdentifier(config.Group, config.Version), implementation)
return hdl
}
func generateCompatStreamHandlerFunc(
procedure string,
srv interface{},
streamFunc func(interface{}, grpc.ServerStream) error,
interceptor Interceptor,
) StreamingHandlerFunc {
implementation := func(ctx context.Context, conn StreamingHandlerConn) error {
stream := &compatHandlerStream{
ctx: ctx,
conn: conn,
}
return streamFunc(srv, stream)
}
if interceptor != nil {
implementation = interceptor.WrapStreamingHandler(implementation)
}
return implementation
}