blob: efcfca55868e7042b685adb5fb2452dd2d3a65c0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package remoting
import (
"errors"
"sync"
"time"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)
var (
// store requestID and response
pendingResponses = new(sync.Map)
)
type SequenceType int64
// It is interface of client for network communication.
// If you use getty as network communication, you should define GettyClient that implements this interface.
type Client interface {
SetExchangeClient(client *ExchangeClient)
// responseHandler is used to deal with msg
SetResponseHandler(responseHandler ResponseHandler)
// connect url
Connect(url common.URL) error
// close
Close()
// send request to server.
Request(request *Request, timeout time.Duration, response *PendingResponse) error
}
// This is abstraction level. it is like facade.
type ExchangeClient struct {
// connect server timeout
ConnectTimeout time.Duration
// to dial server address. The format: ip:port
address string
// the client that will deal with the transport. It is interface, and it will use gettyClient by default.
client Client
// the tag for init.
init bool
}
// handle the message from server
type ResponseHandler interface {
Handler(response *Response)
}
// create ExchangeClient
func NewExchangeClient(url common.URL, client Client, connectTimeout time.Duration, lazyInit bool) *ExchangeClient {
exchangeClient := &ExchangeClient{
ConnectTimeout: connectTimeout,
address: url.Location,
client: client,
}
client.SetExchangeClient(exchangeClient)
if !lazyInit {
if err := exchangeClient.doInit(url); err != nil {
return nil
}
}
client.SetResponseHandler(exchangeClient)
return exchangeClient
}
func (cl *ExchangeClient) doInit(url common.URL) error {
if cl.init {
return nil
}
if cl.client.Connect(url) != nil {
//retry for a while
time.Sleep(100 * time.Millisecond)
if cl.client.Connect(url) != nil {
logger.Errorf("Failed to connect server %+v " + url.Location)
return errors.New("Failed to connect server " + url.Location)
}
}
//FIXME atomic operation
cl.init = true
return nil
}
// two way request
func (client *ExchangeClient) Request(invocation *protocol.Invocation, url common.URL, timeout time.Duration,
result *protocol.RPCResult) error {
if er := client.doInit(url); er != nil {
return er
}
request := NewRequest("2.0.2")
request.Data = invocation
request.Event = false
request.TwoWay = true
rsp := NewPendingResponse(request.ID)
rsp.response = NewResponse(request.ID, "2.0.2")
rsp.Reply = (*invocation).Reply()
AddPendingResponse(rsp)
err := client.client.Request(request, timeout, rsp)
// request error
if err != nil {
result.Err = err
return err
}
if resultTmp, ok := rsp.response.Result.(*protocol.RPCResult); ok {
result.Rest = resultTmp.Rest
result.Attrs = resultTmp.Attrs
result.Err = resultTmp.Err
}
return nil
}
// async two way request
func (client *ExchangeClient) AsyncRequest(invocation *protocol.Invocation, url common.URL, timeout time.Duration,
callback common.AsyncCallback, result *protocol.RPCResult) error {
if er := client.doInit(url); er != nil {
return er
}
request := NewRequest("2.0.2")
request.Data = invocation
request.Event = false
request.TwoWay = true
rsp := NewPendingResponse(request.ID)
rsp.response = NewResponse(request.ID, "2.0.2")
rsp.Callback = callback
rsp.Reply = (*invocation).Reply()
AddPendingResponse(rsp)
err := client.client.Request(request, timeout, rsp)
if err != nil {
result.Err = err
return err
}
result.Rest = rsp.response
return nil
}
// oneway request
func (client *ExchangeClient) Send(invocation *protocol.Invocation, url common.URL, timeout time.Duration) error {
if er := client.doInit(url); er != nil {
return er
}
request := NewRequest("2.0.2")
request.Data = invocation
request.Event = false
request.TwoWay = false
rsp := NewPendingResponse(request.ID)
rsp.response = NewResponse(request.ID, "2.0.2")
err := client.client.Request(request, timeout, rsp)
if err != nil {
return err
}
return nil
}
// close client
func (client *ExchangeClient) Close() {
client.client.Close()
}
// handle the response from server
func (client *ExchangeClient) Handler(response *Response) {
pendingResponse := removePendingResponse(SequenceType(response.ID))
if pendingResponse == nil {
logger.Errorf("failed to get pending response context for response package %s", *response)
return
}
pendingResponse.response = response
if pendingResponse.Callback == nil {
pendingResponse.Err = pendingResponse.response.Error
pendingResponse.Done <- struct{}{}
} else {
pendingResponse.Callback(pendingResponse.GetCallResponse())
}
}
// store response into map
func AddPendingResponse(pr *PendingResponse) {
pendingResponses.Store(SequenceType(pr.seq), pr)
}
// get and remove response
func removePendingResponse(seq SequenceType) *PendingResponse {
if pendingResponses == nil {
return nil
}
if presp, ok := pendingResponses.Load(seq); ok {
pendingResponses.Delete(seq)
return presp.(*PendingResponse)
}
return nil
}
// get response
func GetPendingResponse(seq SequenceType) *PendingResponse {
if presp, ok := pendingResponses.Load(seq); ok {
return presp.(*PendingResponse)
}
return nil
}