| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package triple |
| |
| import ( |
| "context" |
| "crypto/tls" |
| "errors" |
| "fmt" |
| "net" |
| "net/http" |
| "strings" |
| "time" |
| ) |
| |
| import ( |
| "github.com/dubbogo/gost/log/logger" |
| |
| "github.com/dustin/go-humanize" |
| |
| "github.com/quic-go/quic-go" |
| "github.com/quic-go/quic-go/http3" |
| |
| "golang.org/x/net/http2" |
| |
| grpc_health_v1 "google.golang.org/grpc/health/grpc_health_v1" |
| ) |
| |
| import ( |
| "dubbo.apache.org/dubbo-go/v3/common" |
| "dubbo.apache.org/dubbo-go/v3/common/constant" |
| "dubbo.apache.org/dubbo-go/v3/global" |
| tri "dubbo.apache.org/dubbo-go/v3/protocol/triple/triple_protocol" |
| dubbotls "dubbo.apache.org/dubbo-go/v3/tls" |
| ) |
| |
| const ( |
| httpPrefix string = "http://" |
| httpsPrefix string = "https://" |
| ) |
| |
| // clientManager wraps triple clients and is responsible for find concrete triple client to invoke |
| // callUnary, callClientStream, callServerStream, callBidiStream. |
| // A Reference has a clientManager. |
| type clientManager struct { |
| isIDL bool |
| triClient *tri.Client |
| healthClient *tri.Client |
| } |
| |
| // TODO: code a triple client between clientManager and triple_protocol client |
| // TODO: write a NewClient for triple client |
| |
| func (cm *clientManager) callUnary(ctx context.Context, method string, req, resp any) error { |
| triReq := tri.NewRequest(req) |
| triResp := tri.NewResponse(resp) |
| if err := cm.triClient.CallUnary(ctx, triReq, method, triResp); err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| func (cm *clientManager) callClientStream(ctx context.Context, method string) (any, error) { |
| stream, err := cm.triClient.CallClientStream(ctx, method) |
| if err != nil { |
| return nil, err |
| } |
| return stream, nil |
| } |
| |
| func (cm *clientManager) callServerStream(ctx context.Context, method string, req any) (any, error) { |
| triReq := tri.NewRequest(req) |
| stream, err := cm.triClient.CallServerStream(ctx, triReq, method) |
| if err != nil { |
| return nil, err |
| } |
| return stream, nil |
| } |
| |
| func (cm *clientManager) callBidiStream(ctx context.Context, method string) (any, error) { |
| stream, err := cm.triClient.CallBidiStream(ctx, method) |
| if err != nil { |
| return nil, err |
| } |
| return stream, nil |
| } |
| |
| func (cm *clientManager) close() error { |
| // There is no need to release resources right now. |
| // But we leave this function here for future use. |
| return nil |
| } |
| |
| // newClientManager extracts configurations from url and builds clientManager |
| func newClientManager(url *common.URL) (*clientManager, error) { |
| var cliOpts []tri.ClientOption |
| var isIDL bool |
| |
| // set serialization |
| serialization := url.GetParam(constant.SerializationKey, constant.ProtobufSerialization) |
| switch serialization { |
| case constant.ProtobufSerialization: |
| isIDL = true |
| case constant.JSONSerialization: |
| isIDL = true |
| cliOpts = append(cliOpts, tri.WithProtoJSON()) |
| case constant.Hessian2Serialization: |
| cliOpts = append(cliOpts, tri.WithHessian2()) |
| case constant.MsgpackSerialization: |
| cliOpts = append(cliOpts, tri.WithMsgPack()) |
| default: |
| panic(fmt.Sprintf("Unsupported serialization: %s", serialization)) |
| } |
| |
| // set timeout |
| timeout := url.GetParamDuration(constant.TimeoutKey, "") |
| cliOpts = append(cliOpts, tri.WithTimeout(timeout)) |
| |
| // set service group and version |
| group := url.GetParam(constant.GroupKey, "") |
| version := url.GetParam(constant.VersionKey, "") |
| cliOpts = append(cliOpts, tri.WithGroup(group), tri.WithVersion(version)) |
| |
| // todo(DMwangnima): support opentracing |
| |
| // handle tls |
| var ( |
| tlsFlag bool |
| tlsConf *global.TLSConfig |
| cfg *tls.Config |
| err error |
| ) |
| |
| tlsConfRaw, ok := url.GetAttribute(constant.TLSConfigKey) |
| if ok { |
| tlsConf, ok = tlsConfRaw.(*global.TLSConfig) |
| if !ok { |
| return nil, errors.New("TRIPLE clientManager initialized the TLSConfig configuration failed") |
| } |
| } |
| if dubbotls.IsClientTLSValid(tlsConf) { |
| cfg, err = dubbotls.GetClientTlSConfig(tlsConf) |
| if err != nil { |
| return nil, err |
| } |
| if cfg != nil { |
| logger.Infof("TRIPLE clientManager initialized the TLSConfig configuration") |
| tlsFlag = true |
| } |
| } |
| |
| var tripleConf *global.TripleConfig |
| |
| tripleConfRaw, ok := url.GetAttribute(constant.TripleConfigKey) |
| if ok { |
| tripleConf = tripleConfRaw.(*global.TripleConfig) |
| } |
| |
| // handle keepalive options |
| cliKeepAliveOpts, keepAliveInterval, keepAliveTimeout, genKeepAliveOptsErr := genKeepAliveOptions(url, tripleConf) |
| if genKeepAliveOptsErr != nil { |
| logger.Errorf("genKeepAliveOpts err: %v", genKeepAliveOptsErr) |
| return nil, genKeepAliveOptsErr |
| } |
| cliOpts = append(cliOpts, cliKeepAliveOpts...) |
| |
| // handle http transport of triple protocol |
| var transport http.RoundTripper |
| |
| var callProtocol string |
| if tripleConf != nil && tripleConf.Http3 != nil && tripleConf.Http3.Enable { |
| callProtocol = constant.CallHTTP2AndHTTP3 |
| } else { |
| // HTTP default type is HTTP/2. |
| callProtocol = constant.CallHTTP2 |
| } |
| |
| switch callProtocol { |
| // This case might be for backward compatibility, |
| // it's not useful for the Triple protocol, HTTP/1 lacks trailer functionality. |
| // Triple protocol only supports HTTP/2 and HTTP/3. |
| case constant.CallHTTP: |
| transport = &http.Transport{ |
| TLSClientConfig: cfg, |
| } |
| cliOpts = append(cliOpts, tri.WithTriple()) |
| case constant.CallHTTP2: |
| // TODO: Enrich the http2 transport config for triple protocol. |
| if tlsFlag { |
| transport = &http2.Transport{ |
| TLSClientConfig: cfg, |
| ReadIdleTimeout: keepAliveInterval, |
| PingTimeout: keepAliveTimeout, |
| DialTLSContext: func(ctx context.Context, network, addr string, tlsConfig *tls.Config) (net.Conn, error) { |
| return (&tls.Dialer{Config: tlsConfig}).DialContext(ctx, network, addr) |
| }, |
| } |
| } else { |
| transport = &http2.Transport{ |
| DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) { |
| return (&net.Dialer{}).DialContext(ctx, network, addr) |
| }, |
| AllowHTTP: true, |
| ReadIdleTimeout: keepAliveInterval, |
| PingTimeout: keepAliveTimeout, |
| } |
| } |
| case constant.CallHTTP3: |
| if !tlsFlag { |
| return nil, fmt.Errorf("TRIPLE http3 client must have TLS config, but TLS config is nil") |
| } |
| |
| // TODO: Enrich the http3 transport config for triple protocol. |
| transport = &http3.Transport{ |
| TLSClientConfig: cfg, |
| QUICConfig: &quic.Config{ |
| // ref: https://quic-go.net/docs/quic/connection/#keeping-a-connection-alive |
| KeepAlivePeriod: keepAliveInterval, |
| // ref: https://quic-go.net/docs/quic/connection/#idle-timeout |
| MaxIdleTimeout: keepAliveTimeout, |
| }, |
| } |
| |
| logger.Infof("Triple http3 client transport init successfully") |
| case constant.CallHTTP2AndHTTP3: |
| if !tlsFlag { |
| return nil, fmt.Errorf("TRIPLE HTTP/2 and HTTP/3 client must have TLS config, but TLS config is nil") |
| } |
| |
| // Create a dual transport that can handle both HTTP/2 and HTTP/3 |
| transport = newDualTransport(cfg, keepAliveInterval, keepAliveTimeout) |
| logger.Infof("Triple HTTP/2 and HTTP/3 client transport init successfully") |
| default: |
| return nil, fmt.Errorf("unsupported http protocol: %s", callProtocol) |
| } |
| |
| httpClient := &http.Client{ |
| Transport: transport, |
| } |
| |
| var baseTriURL string |
| baseTriURL = strings.TrimPrefix(url.Location, httpPrefix) |
| baseTriURL = strings.TrimPrefix(baseTriURL, httpsPrefix) |
| if tlsFlag { |
| baseTriURL = httpsPrefix + baseTriURL |
| } else { |
| baseTriURL = httpPrefix + baseTriURL |
| } |
| |
| triURL, err := joinPath(baseTriURL, url.Interface()) |
| if err != nil { |
| return nil, fmt.Errorf("JoinPath failed for base %s, interface %s", baseTriURL, url.Interface()) |
| } |
| |
| triClient := tri.NewClient(httpClient, triURL, cliOpts...) |
| healthURL, err := joinPath(baseTriURL, constant.HealthCheckServiceInterface) |
| if err != nil { |
| return nil, fmt.Errorf("JoinPath failed for base %s, health interface %s", baseTriURL, constant.HealthCheckServiceInterface) |
| } |
| healthClient := tri.NewClient(httpClient, healthURL, tri.WithTimeout(timeout)) |
| |
| return &clientManager{ |
| isIDL: isIDL, |
| triClient: triClient, |
| healthClient: healthClient, |
| }, nil |
| } |
| |
| func (cm *clientManager) callHealthWatch(ctx context.Context, service string) (*tri.ServerStreamForClient, error) { |
| if cm.healthClient == nil { |
| return nil, errors.New("triple health client is not initialized") |
| } |
| req := tri.NewRequest(&grpc_health_v1.HealthCheckRequest{Service: service}) |
| stream, err := cm.healthClient.CallServerStream(ctx, req, "Watch") |
| if err != nil { |
| return nil, err |
| } |
| return stream, nil |
| } |
| |
| func genKeepAliveOptions(url *common.URL, tripleConf *global.TripleConfig) ([]tri.ClientOption, time.Duration, time.Duration, error) { |
| var cliKeepAliveOpts []tri.ClientOption |
| |
| // set max send and recv msg size |
| maxCallRecvMsgSize := constant.DefaultMaxCallRecvMsgSize |
| if recvMsgSize, err := humanize.ParseBytes(url.GetParam(constant.MaxCallRecvMsgSize, "")); err == nil && recvMsgSize > 0 { |
| maxCallRecvMsgSize = int(recvMsgSize) |
| } |
| cliKeepAliveOpts = append(cliKeepAliveOpts, tri.WithReadMaxBytes(maxCallRecvMsgSize)) |
| maxCallSendMsgSize := constant.DefaultMaxCallSendMsgSize |
| if sendMsgSize, err := humanize.ParseBytes(url.GetParam(constant.MaxCallSendMsgSize, "")); err == nil && sendMsgSize > 0 { |
| maxCallSendMsgSize = int(sendMsgSize) |
| } |
| cliKeepAliveOpts = append(cliKeepAliveOpts, tri.WithSendMaxBytes(maxCallSendMsgSize)) |
| |
| // set keepalive interval and keepalive timeout |
| // Deprecated:use tripleconfig |
| // TODO: remove KeepAliveInterval and KeepAliveInterval in version 4.0.0 |
| keepAliveInterval := url.GetParamDuration(constant.KeepAliveInterval, constant.DefaultKeepAliveInterval) |
| keepAliveTimeout := url.GetParamDuration(constant.KeepAliveTimeout, constant.DefaultKeepAliveTimeout) |
| |
| if tripleConf == nil { |
| return cliKeepAliveOpts, keepAliveInterval, keepAliveTimeout, nil |
| } |
| |
| var parseErr error |
| |
| if tripleConf.KeepAliveInterval != "" { |
| keepAliveInterval, parseErr = time.ParseDuration(tripleConf.KeepAliveInterval) |
| if parseErr != nil { |
| return nil, 0, 0, parseErr |
| } |
| } |
| if tripleConf.KeepAliveTimeout != "" { |
| keepAliveTimeout, parseErr = time.ParseDuration(tripleConf.KeepAliveTimeout) |
| if parseErr != nil { |
| return nil, 0, 0, parseErr |
| } |
| } |
| |
| return cliKeepAliveOpts, keepAliveInterval, keepAliveTimeout, nil |
| } |