| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package remoting |
| |
| import ( |
| "errors" |
| "sync" |
| "time" |
| ) |
| |
| import ( |
| "github.com/apache/dubbo-go/common" |
| "github.com/apache/dubbo-go/common/logger" |
| "github.com/apache/dubbo-go/protocol" |
| ) |
| |
| var ( |
| // store requestID and response |
| pendingResponses = new(sync.Map) |
| ) |
| |
| type SequenceType int64 |
| |
| // It is interface of client for network communication. |
| // If you use getty as network communication, you should define GettyClient that implements this interface. |
| type Client interface { |
| SetExchangeClient(client *ExchangeClient) |
| // responseHandler is used to deal with msg |
| SetResponseHandler(responseHandler ResponseHandler) |
| // connect url |
| Connect(url common.URL) error |
| // close |
| Close() |
| // send request to server. |
| Request(request *Request, timeout time.Duration, response *PendingResponse) error |
| } |
| |
| // This is abstraction level. it is like facade. |
| type ExchangeClient struct { |
| // connect server timeout |
| ConnectTimeout time.Duration |
| // to dial server address. The format: ip:port |
| address string |
| // the client that will deal with the transport. It is interface, and it will use gettyClient by default. |
| client Client |
| // the tag for init. |
| init bool |
| } |
| |
| // handle the message from server |
| type ResponseHandler interface { |
| Handler(response *Response) |
| } |
| |
| // create ExchangeClient |
| func NewExchangeClient(url common.URL, client Client, connectTimeout time.Duration, lazyInit bool) *ExchangeClient { |
| exchangeClient := &ExchangeClient{ |
| ConnectTimeout: connectTimeout, |
| address: url.Location, |
| client: client, |
| } |
| client.SetExchangeClient(exchangeClient) |
| if !lazyInit { |
| if err := exchangeClient.doInit(url); err != nil { |
| return nil |
| } |
| } |
| |
| client.SetResponseHandler(exchangeClient) |
| return exchangeClient |
| } |
| |
| func (cl *ExchangeClient) doInit(url common.URL) error { |
| if cl.init { |
| return nil |
| } |
| if cl.client.Connect(url) != nil { |
| //retry for a while |
| time.Sleep(100 * time.Millisecond) |
| if cl.client.Connect(url) != nil { |
| logger.Errorf("Failed to connect server %+v " + url.Location) |
| return errors.New("Failed to connect server " + url.Location) |
| } |
| } |
| //FIXME atomic operation |
| cl.init = true |
| return nil |
| } |
| |
| // two way request |
| func (client *ExchangeClient) Request(invocation *protocol.Invocation, url common.URL, timeout time.Duration, |
| result *protocol.RPCResult) error { |
| if er := client.doInit(url); er != nil { |
| return er |
| } |
| request := NewRequest("2.0.2") |
| request.Data = invocation |
| request.Event = false |
| request.TwoWay = true |
| |
| rsp := NewPendingResponse(request.ID) |
| rsp.response = NewResponse(request.ID, "2.0.2") |
| rsp.Reply = (*invocation).Reply() |
| AddPendingResponse(rsp) |
| |
| err := client.client.Request(request, timeout, rsp) |
| // request error |
| if err != nil { |
| result.Err = err |
| return err |
| } |
| if resultTmp, ok := rsp.response.Result.(*protocol.RPCResult); ok { |
| result.Rest = resultTmp.Rest |
| result.Attrs = resultTmp.Attrs |
| result.Err = resultTmp.Err |
| } |
| return nil |
| } |
| |
| // async two way request |
| func (client *ExchangeClient) AsyncRequest(invocation *protocol.Invocation, url common.URL, timeout time.Duration, |
| callback common.AsyncCallback, result *protocol.RPCResult) error { |
| if er := client.doInit(url); er != nil { |
| return er |
| } |
| request := NewRequest("2.0.2") |
| request.Data = invocation |
| request.Event = false |
| request.TwoWay = true |
| |
| rsp := NewPendingResponse(request.ID) |
| rsp.response = NewResponse(request.ID, "2.0.2") |
| rsp.Callback = callback |
| rsp.Reply = (*invocation).Reply() |
| AddPendingResponse(rsp) |
| |
| err := client.client.Request(request, timeout, rsp) |
| if err != nil { |
| result.Err = err |
| return err |
| } |
| result.Rest = rsp.response |
| return nil |
| } |
| |
| // oneway request |
| func (client *ExchangeClient) Send(invocation *protocol.Invocation, url common.URL, timeout time.Duration) error { |
| if er := client.doInit(url); er != nil { |
| return er |
| } |
| request := NewRequest("2.0.2") |
| request.Data = invocation |
| request.Event = false |
| request.TwoWay = false |
| |
| rsp := NewPendingResponse(request.ID) |
| rsp.response = NewResponse(request.ID, "2.0.2") |
| |
| err := client.client.Request(request, timeout, rsp) |
| if err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| // close client |
| func (client *ExchangeClient) Close() { |
| client.client.Close() |
| // for reinit client |
| client.init = false |
| } |
| |
| // handle the response from server |
| func (client *ExchangeClient) Handler(response *Response) { |
| |
| pendingResponse := removePendingResponse(SequenceType(response.ID)) |
| if pendingResponse == nil { |
| logger.Errorf("failed to get pending response context for response package %s", *response) |
| return |
| } |
| |
| pendingResponse.response = response |
| |
| if pendingResponse.Callback == nil { |
| pendingResponse.Err = pendingResponse.response.Error |
| pendingResponse.Done <- struct{}{} |
| } else { |
| pendingResponse.Callback(pendingResponse.GetCallResponse()) |
| } |
| } |
| |
| // store response into map |
| func AddPendingResponse(pr *PendingResponse) { |
| pendingResponses.Store(SequenceType(pr.seq), pr) |
| } |
| |
| // get and remove response |
| func removePendingResponse(seq SequenceType) *PendingResponse { |
| if pendingResponses == nil { |
| return nil |
| } |
| if presp, ok := pendingResponses.Load(seq); ok { |
| pendingResponses.Delete(seq) |
| return presp.(*PendingResponse) |
| } |
| return nil |
| } |
| |
| // get response |
| func GetPendingResponse(seq SequenceType) *PendingResponse { |
| if presp, ok := pendingResponses.Load(seq); ok { |
| return presp.(*PendingResponse) |
| } |
| return nil |
| } |