blob: 3c632f6e9ab2774e53b563527eef93545486129e [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package triple
import (
"context"
"crypto/tls"
"errors"
"fmt"
"net"
"net/http"
"strings"
"time"
)
import (
"github.com/dubbogo/gost/log/logger"
"github.com/dustin/go-humanize"
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/http3"
"golang.org/x/net/http2"
grpc_health_v1 "google.golang.org/grpc/health/grpc_health_v1"
)
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/global"
tri "dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol"
dubbotls "dubbo.apache.org/dubbo-go/v3/tls"
)
const (
httpPrefix string = "http://"
httpsPrefix string = "https://"
)
// clientManager wraps triple clients and is responsible for find concrete triple client to invoke
// callUnary, callClientStream, callServerStream, callBidiStream.
// A Reference has a clientManager.
type clientManager struct {
isIDL bool
triClient *tri.Client
healthClient *tri.Client
}
// TODO: code a triple client between clientManager and triple_protocol client
// TODO: write a NewClient for triple client
func (cm *clientManager) callUnary(ctx context.Context, method string, req, resp any) error {
triReq := tri.NewRequest(req)
triResp := tri.NewResponse(resp)
if err := cm.triClient.CallUnary(ctx, triReq, method, triResp); err != nil {
return err
}
return nil
}
func (cm *clientManager) callClientStream(ctx context.Context, method string) (any, error) {
stream, err := cm.triClient.CallClientStream(ctx, method)
if err != nil {
return nil, err
}
return stream, nil
}
func (cm *clientManager) callServerStream(ctx context.Context, method string, req any) (any, error) {
triReq := tri.NewRequest(req)
stream, err := cm.triClient.CallServerStream(ctx, triReq, method)
if err != nil {
return nil, err
}
return stream, nil
}
func (cm *clientManager) callBidiStream(ctx context.Context, method string) (any, error) {
stream, err := cm.triClient.CallBidiStream(ctx, method)
if err != nil {
return nil, err
}
return stream, nil
}
func (cm *clientManager) close() error {
// There is no need to release resources right now.
// But we leave this function here for future use.
return nil
}
// newClientManager extracts configurations from url and builds clientManager
func newClientManager(url *common.URL) (*clientManager, error) {
var cliOpts []tri.ClientOption
var isIDL bool
// set serialization
serialization := url.GetParam(constant.SerializationKey, constant.ProtobufSerialization)
switch serialization {
case constant.ProtobufSerialization:
isIDL = true
case constant.JSONSerialization:
isIDL = true
cliOpts = append(cliOpts, tri.WithProtoJSON())
case constant.Hessian2Serialization:
cliOpts = append(cliOpts, tri.WithHessian2())
case constant.MsgpackSerialization:
cliOpts = append(cliOpts, tri.WithMsgPack())
default:
panic(fmt.Sprintf("Unsupported serialization: %s", serialization))
}
// set timeout
timeout := url.GetParamDuration(constant.TimeoutKey, "")
cliOpts = append(cliOpts, tri.WithTimeout(timeout))
// set service group and version
group := url.GetParam(constant.GroupKey, "")
version := url.GetParam(constant.VersionKey, "")
cliOpts = append(cliOpts, tri.WithGroup(group), tri.WithVersion(version))
// todo(DMwangnima): support opentracing
// handle tls
var (
tlsFlag bool
tlsConf *global.TLSConfig
cfg *tls.Config
err error
)
tlsConfRaw, ok := url.GetAttribute(constant.TLSConfigKey)
if ok {
tlsConf, ok = tlsConfRaw.(*global.TLSConfig)
if !ok {
return nil, errors.New("TRIPLE clientManager initialized the TLSConfig configuration failed")
}
}
if dubbotls.IsClientTLSValid(tlsConf) {
cfg, err = dubbotls.GetClientTlSConfig(tlsConf)
if err != nil {
return nil, err
}
if cfg != nil {
logger.Infof("TRIPLE clientManager initialized the TLSConfig configuration")
tlsFlag = true
}
}
var tripleConf *global.TripleConfig
tripleConfRaw, ok := url.GetAttribute(constant.TripleConfigKey)
if ok {
tripleConf = tripleConfRaw.(*global.TripleConfig)
}
// handle keepalive options
cliKeepAliveOpts, keepAliveInterval, keepAliveTimeout, genKeepAliveOptsErr := genKeepAliveOptions(url, tripleConf)
if genKeepAliveOptsErr != nil {
logger.Errorf("genKeepAliveOpts err: %v", genKeepAliveOptsErr)
return nil, genKeepAliveOptsErr
}
cliOpts = append(cliOpts, cliKeepAliveOpts...)
// handle http transport of triple protocol
var transport http.RoundTripper
var callProtocol string
if tripleConf != nil && tripleConf.Http3 != nil && tripleConf.Http3.Enable {
callProtocol = constant.CallHTTP2AndHTTP3
} else {
// HTTP default type is HTTP/2.
callProtocol = constant.CallHTTP2
}
switch callProtocol {
// This case might be for backward compatibility,
// it's not useful for the Triple protocol, HTTP/1 lacks trailer functionality.
// Triple protocol only supports HTTP/2 and HTTP/3.
case constant.CallHTTP:
transport = &http.Transport{
TLSClientConfig: cfg,
}
cliOpts = append(cliOpts, tri.WithTriple())
case constant.CallHTTP2:
// TODO: Enrich the http2 transport config for triple protocol.
if tlsFlag {
transport = &http2.Transport{
TLSClientConfig: cfg,
ReadIdleTimeout: keepAliveInterval,
PingTimeout: keepAliveTimeout,
DialTLSContext: func(ctx context.Context, network, addr string, tlsConfig *tls.Config) (net.Conn, error) {
return (&tls.Dialer{Config: tlsConfig}).DialContext(ctx, network, addr)
},
}
} else {
transport = &http2.Transport{
DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, network, addr)
},
AllowHTTP: true,
ReadIdleTimeout: keepAliveInterval,
PingTimeout: keepAliveTimeout,
}
}
case constant.CallHTTP3:
if !tlsFlag {
return nil, fmt.Errorf("TRIPLE http3 client must have TLS config, but TLS config is nil")
}
// TODO: Enrich the http3 transport config for triple protocol.
transport = &http3.Transport{
TLSClientConfig: cfg,
QUICConfig: &quic.Config{
// ref: https://quic-go.net/docs/quic/connection/#keeping-a-connection-alive
KeepAlivePeriod: keepAliveInterval,
// ref: https://quic-go.net/docs/quic/connection/#idle-timeout
MaxIdleTimeout: keepAliveTimeout,
},
}
logger.Infof("Triple http3 client transport init successfully")
case constant.CallHTTP2AndHTTP3:
if !tlsFlag {
return nil, fmt.Errorf("TRIPLE HTTP/2 and HTTP/3 client must have TLS config, but TLS config is nil")
}
// Create a dual transport that can handle both HTTP/2 and HTTP/3
transport = newDualTransport(cfg, keepAliveInterval, keepAliveTimeout)
logger.Infof("Triple HTTP/2 and HTTP/3 client transport init successfully")
default:
return nil, fmt.Errorf("unsupported http protocol: %s", callProtocol)
}
httpClient := &http.Client{
Transport: transport,
}
var baseTriURL string
baseTriURL = strings.TrimPrefix(url.Location, httpPrefix)
baseTriURL = strings.TrimPrefix(baseTriURL, httpsPrefix)
if tlsFlag {
baseTriURL = httpsPrefix + baseTriURL
} else {
baseTriURL = httpPrefix + baseTriURL
}
triURL, err := joinPath(baseTriURL, url.Interface())
if err != nil {
return nil, fmt.Errorf("JoinPath failed for base %s, interface %s", baseTriURL, url.Interface())
}
triClient := tri.NewClient(httpClient, triURL, cliOpts...)
healthURL, err := joinPath(baseTriURL, constant.HealthCheckServiceInterface)
if err != nil {
return nil, fmt.Errorf("JoinPath failed for base %s, health interface %s", baseTriURL, constant.HealthCheckServiceInterface)
}
healthClient := tri.NewClient(httpClient, healthURL, tri.WithTimeout(timeout))
return &clientManager{
isIDL: isIDL,
triClient: triClient,
healthClient: healthClient,
}, nil
}
func (cm *clientManager) callHealthWatch(ctx context.Context, service string) (*tri.ServerStreamForClient, error) {
if cm.healthClient == nil {
return nil, errors.New("triple health client is not initialized")
}
req := tri.NewRequest(&grpc_health_v1.HealthCheckRequest{Service: service})
stream, err := cm.healthClient.CallServerStream(ctx, req, "Watch")
if err != nil {
return nil, err
}
return stream, nil
}
func genKeepAliveOptions(url *common.URL, tripleConf *global.TripleConfig) ([]tri.ClientOption, time.Duration, time.Duration, error) {
var cliKeepAliveOpts []tri.ClientOption
// set max send and recv msg size
maxCallRecvMsgSize := constant.DefaultMaxCallRecvMsgSize
if recvMsgSize, err := humanize.ParseBytes(url.GetParam(constant.MaxCallRecvMsgSize, "")); err == nil && recvMsgSize > 0 {
maxCallRecvMsgSize = int(recvMsgSize)
}
cliKeepAliveOpts = append(cliKeepAliveOpts, tri.WithReadMaxBytes(maxCallRecvMsgSize))
maxCallSendMsgSize := constant.DefaultMaxCallSendMsgSize
if sendMsgSize, err := humanize.ParseBytes(url.GetParam(constant.MaxCallSendMsgSize, "")); err == nil && sendMsgSize > 0 {
maxCallSendMsgSize = int(sendMsgSize)
}
cliKeepAliveOpts = append(cliKeepAliveOpts, tri.WithSendMaxBytes(maxCallSendMsgSize))
// set keepalive interval and keepalive timeout
// Deprecated:use tripleconfig
// TODO: remove KeepAliveInterval and KeepAliveInterval in version 4.0.0
keepAliveInterval := url.GetParamDuration(constant.KeepAliveInterval, constant.DefaultKeepAliveInterval)
keepAliveTimeout := url.GetParamDuration(constant.KeepAliveTimeout, constant.DefaultKeepAliveTimeout)
if tripleConf == nil {
return cliKeepAliveOpts, keepAliveInterval, keepAliveTimeout, nil
}
var parseErr error
if tripleConf.KeepAliveInterval != "" {
keepAliveInterval, parseErr = time.ParseDuration(tripleConf.KeepAliveInterval)
if parseErr != nil {
return nil, 0, 0, parseErr
}
}
if tripleConf.KeepAliveTimeout != "" {
keepAliveTimeout, parseErr = time.ParseDuration(tripleConf.KeepAliveTimeout)
if parseErr != nil {
return nil, 0, 0, parseErr
}
}
return cliKeepAliveOpts, keepAliveInterval, keepAliveTimeout, nil
}