| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package getty |
| |
| import ( |
| "math/rand" |
| "time" |
| ) |
| |
| import ( |
| "github.com/apache/dubbo-getty" |
| gxsync "github.com/dubbogo/gost/sync" |
| perrors "github.com/pkg/errors" |
| "gopkg.in/yaml.v2" |
| ) |
| |
| import ( |
| "github.com/apache/dubbo-go/common" |
| "github.com/apache/dubbo-go/common/constant" |
| "github.com/apache/dubbo-go/common/logger" |
| "github.com/apache/dubbo-go/config" |
| "github.com/apache/dubbo-go/remoting" |
| ) |
| |
| var ( |
| errInvalidCodecType = perrors.New("illegal CodecType") |
| errInvalidAddress = perrors.New("remote address invalid or empty") |
| errSessionNotExist = perrors.New("session not exist") |
| errClientClosed = perrors.New("client closed") |
| errClientReadTimeout = perrors.New("client read timeout") |
| |
| clientConf *ClientConfig |
| clientGrpool *gxsync.TaskPool |
| ) |
| |
| // it is init client for single protocol. |
| func initClient(protocol string) { |
| if protocol == "" { |
| return |
| } |
| |
| // load clientconfig from consumer_config |
| // default use dubbo |
| consumerConfig := config.GetConsumerConfig() |
| if consumerConfig.ApplicationConfig == nil { |
| return |
| } |
| protocolConf := config.GetConsumerConfig().ProtocolConf |
| defaultClientConfig := GetDefaultClientConfig() |
| if protocolConf == nil { |
| logger.Info("protocol_conf default use dubbo config") |
| } else { |
| dubboConf := protocolConf.(map[interface{}]interface{})[protocol] |
| if dubboConf == nil { |
| logger.Warnf("dubboConf is nil") |
| return |
| } |
| dubboConfByte, err := yaml.Marshal(dubboConf) |
| if err != nil { |
| panic(err) |
| } |
| err = yaml.Unmarshal(dubboConfByte, &defaultClientConfig) |
| if err != nil { |
| panic(err) |
| } |
| } |
| clientConf = &defaultClientConfig |
| if err := clientConf.CheckValidity(); err != nil { |
| logger.Warnf("[CheckValidity] error: %v", err) |
| return |
| } |
| setClientGrpool() |
| |
| rand.Seed(time.Now().UnixNano()) |
| } |
| |
| // Config ClientConf |
| func SetClientConf(c ClientConfig) { |
| clientConf = &c |
| err := clientConf.CheckValidity() |
| if err != nil { |
| logger.Warnf("[ClientConfig CheckValidity] error: %v", err) |
| return |
| } |
| setClientGrpool() |
| } |
| |
| func setClientGrpool() { |
| if clientConf.GrPoolSize > 1 { |
| clientGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(clientConf.GrPoolSize), gxsync.WithTaskPoolTaskQueueLength(clientConf.QueueLen), |
| gxsync.WithTaskPoolTaskQueueNumber(clientConf.QueueNumber)) |
| } |
| } |
| |
| // Options : param config |
| type Options struct { |
| // connect timeout |
| // remove request timeout, it will be calulate for every request |
| ConnectTimeout time.Duration |
| // request timeout |
| RequestTimeout time.Duration |
| } |
| |
| // Client : some configuration for network communication. |
| type Client struct { |
| addr string |
| opts Options |
| conf ClientConfig |
| pool *gettyRPCClientPool |
| codec remoting.Codec |
| responseHandler remoting.ResponseHandler |
| ExchangeClient *remoting.ExchangeClient |
| } |
| |
| // create client |
| func NewClient(opt Options) *Client { |
| switch { |
| case opt.ConnectTimeout == 0: |
| opt.ConnectTimeout = 3 * time.Second |
| fallthrough |
| case opt.RequestTimeout == 0: |
| opt.RequestTimeout = 3 * time.Second |
| } |
| |
| c := &Client{ |
| opts: opt, |
| } |
| return c |
| } |
| |
| func (c *Client) SetExchangeClient(client *remoting.ExchangeClient) { |
| c.ExchangeClient = client |
| } |
| func (c *Client) SetResponseHandler(responseHandler remoting.ResponseHandler) { |
| c.responseHandler = responseHandler |
| } |
| |
| // init client and try to connection. |
| func (c *Client) Connect(url common.URL) error { |
| initClient(url.Protocol) |
| c.conf = *clientConf |
| // new client |
| c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL)) |
| c.pool.sslEnabled = url.GetParamBool(constant.SSL_ENABLED_KEY, false) |
| |
| // codec |
| c.codec = remoting.GetCodec(url.Protocol) |
| c.addr = url.Location |
| _, _, err := c.selectSession(c.addr) |
| if err != nil { |
| logger.Errorf("try to connect server %v failed for : %v", url.Location, err) |
| } |
| return err |
| } |
| |
| // close network connection |
| func (c *Client) Close() { |
| if c.pool != nil { |
| c.pool.close() |
| } |
| c.pool = nil |
| } |
| |
| // send request |
| func (c *Client) Request(request *remoting.Request, timeout time.Duration, response *remoting.PendingResponse) error { |
| _, session, err := c.selectSession(c.addr) |
| if err != nil { |
| return perrors.WithStack(err) |
| } |
| if session == nil { |
| return errSessionNotExist |
| } |
| |
| if err = c.transfer(session, request, timeout); err != nil { |
| return perrors.WithStack(err) |
| } |
| |
| if !request.TwoWay || response.Callback != nil { |
| return nil |
| } |
| |
| select { |
| case <-getty.GetTimeWheel().After(timeout): |
| return perrors.WithStack(errClientReadTimeout) |
| case <-response.Done: |
| err = response.Err |
| } |
| |
| return perrors.WithStack(err) |
| } |
| |
| func (c *Client) selectSession(addr string) (*gettyRPCClient, getty.Session, error) { |
| rpcClient, err := c.pool.getGettyRpcClient(addr) |
| if err != nil { |
| return nil, nil, perrors.WithStack(err) |
| } |
| return rpcClient, rpcClient.selectSession(), nil |
| } |
| |
| func (c *Client) heartbeat(session getty.Session) error { |
| req := remoting.NewRequest("2.0.2") |
| req.TwoWay = true |
| req.Event = true |
| resp := remoting.NewPendingResponse(req.ID) |
| remoting.AddPendingResponse(resp) |
| return c.transfer(session, req, 3*time.Second) |
| } |
| |
| func (c *Client) transfer(session getty.Session, request *remoting.Request, timeout time.Duration) error { |
| err := session.WritePkg(request, timeout) |
| return perrors.WithStack(err) |
| } |