blob: 3cf0e044256ecc8e95d57c54a4c1459d1227244b [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remoting
import (
"bytes"
"encoding/binary"
"errors"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/api/model"
"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
"github.com/golang/glog"
"math/rand"
"net"
"strconv"
"strings"
"sync"
"time"
)
//RemotingClient mq remoting client
type RemotingClient interface {
//InvokeSync sync invoke remote
InvokeSync(addr string, request *RemotingCommand, timeoutMillis int64) (remotingCommand *RemotingCommand, err error)
//InvokeAsync async invoke remote
InvokeAsync(addr string, request *RemotingCommand, timeoutMillis int64, invokeCallback InvokeCallback) error
//InvokeOneWay one way invoke remote
InvokeOneWay(addr string, request *RemotingCommand, timeoutMillis int64) error
}
//DefaultRemotingClient of RemotingClient
type DefaultRemotingClient struct {
clientId string
clientConfig *rocketmqm.MqClientConfig
connTable map[string]net.Conn
connTableLock sync.RWMutex
responseTable util.ConcurrentMap //map[int32]*ResponseFuture
processorTable util.ConcurrentMap //map[int]ClientRequestProcessor //requestCode|ClientRequestProcessor
namesrvAddrList []string
namesrvAddrSelectedAddr string
namesrvAddrSelectedIndex int
namesvrLockRW sync.RWMutex
clientRequestProcessor ClientRequestProcessor //mange register the processor here
serializerHandler SerializerHandler //rocketmq encode decode
}
//RemotingClientInit create a RemotingClient instance
func RemotingClientInit(clientConfig *rocketmqm.MqClientConfig, clientRequestProcessor ClientRequestProcessor) (client *DefaultRemotingClient) {
client = &DefaultRemotingClient{}
client.connTable = map[string]net.Conn{}
client.responseTable = util.NewConcurrentMap()
client.clientConfig = clientConfig
client.namesrvAddrList = strings.Split(clientConfig.NameServerAddress, ";")
client.namesrvAddrSelectedIndex = -1
client.clientRequestProcessor = clientRequestProcessor
client.serializerHandler = newSerializerHandler(clientConfig.ClientSerializeType)
return
}
//InvokeSync sync invoke remote
func (drc *DefaultRemotingClient) InvokeSync(addr string, request *RemotingCommand, timeoutMillis int64) (remotingCommand *RemotingCommand, err error) {
var conn net.Conn
conn, err = drc.getOrCreateConn(addr)
response := &ResponseFuture{
SendRequestOK: false,
Opaque: request.Opaque,
TimeoutMillis: timeoutMillis,
BeginTimestamp: time.Now().Unix(),
Done: make(chan bool),
}
header := drc.serializerHandler.encodeHeader(request)
body := request.Body
drc.setResponse(request.Opaque, response)
err = drc.sendRequest(header, body, conn, addr)
if err != nil {
glog.Error(err)
return
}
select {
case <-response.Done:
remotingCommand = response.ResponseCommand
return
case <-time.After(time.Duration(timeoutMillis) * time.Millisecond):
err = errors.New("invoke sync timeout:" + strconv.FormatInt(timeoutMillis, 10) + " Millisecond")
return
}
}
//InvokeAsync async invoke remote
func (drc *DefaultRemotingClient) InvokeAsync(addr string, request *RemotingCommand, timeoutMillis int64, invokeCallback InvokeCallback) error {
conn, err := drc.getOrCreateConn(addr)
if err != nil {
return err
}
response := &ResponseFuture{
SendRequestOK: false,
Opaque: request.Opaque,
TimeoutMillis: timeoutMillis,
BeginTimestamp: time.Now().Unix(),
InvokeCallback: invokeCallback,
}
drc.setResponse(request.Opaque, response)
header := drc.serializerHandler.encodeHeader(request)
body := request.Body
err = drc.sendRequest(header, body, conn, addr)
if err != nil {
glog.Error(err)
return err
}
return err
}
//InvokeOneWay one way invoke remote
func (drc *DefaultRemotingClient) InvokeOneWay(addr string, request *RemotingCommand, timeoutMillis int64) error {
conn, err := drc.getOrCreateConn(addr)
if err != nil {
return err
}
header := drc.serializerHandler.encodeHeader(request)
body := request.Body
err = drc.sendRequest(header, body, conn, addr)
if err != nil {
glog.Error(err)
return err
}
return err
}
func (drc *DefaultRemotingClient) sendRequest(header, body []byte, conn net.Conn, addr string) error {
var requestBytes []byte
requestBytes = append(requestBytes, header...)
if body != nil && len(body) > 0 {
requestBytes = append(requestBytes, body...)
}
_, err := conn.Write(requestBytes)
if err != nil {
glog.Error(err)
if len(addr) > 0 {
drc.releaseConn(addr, conn)
}
return err
}
return nil
}
//GetNamesrvAddrList GetNamesrvAddrList
func (drc *DefaultRemotingClient) GetNamesrvAddrList() []string {
return drc.namesrvAddrList
}
func (drc *DefaultRemotingClient) setResponse(index int32, response *ResponseFuture) {
drc.responseTable.Set(strconv.Itoa(int(index)), response)
}
func (drc *DefaultRemotingClient) getResponse(index int32) (response *ResponseFuture, err error) {
obj, ok := drc.responseTable.Get(strconv.Itoa(int(index)))
if !ok {
err = errors.New("get conn from responseTable error")
return
}
response = obj.(*ResponseFuture)
return
}
func (drc *DefaultRemotingClient) removeResponse(index int32) {
drc.responseTable.Remove(strconv.Itoa(int(index)))
}
func (drc *DefaultRemotingClient) getOrCreateConn(address string) (conn net.Conn, err error) {
if len(address) == 0 {
conn, err = drc.getNamesvrConn()
return
}
conn = drc.getConn(address)
if conn != nil {
return
}
conn, err = drc.createConn(address)
return
}
func (drc *DefaultRemotingClient) getConn(address string) (conn net.Conn) {
drc.connTableLock.RLock()
conn = drc.connTable[address]
drc.connTableLock.RUnlock()
return
}
func (drc *DefaultRemotingClient) createConn(address string) (conn net.Conn, err error) {
defer drc.connTableLock.Unlock()
drc.connTableLock.Lock()
conn = drc.connTable[address]
if conn != nil {
return
}
conn, err = drc.createAndHandleTcpConn(address)
drc.connTable[address] = conn
return
}
func (drc *DefaultRemotingClient) getNamesvrConn() (conn net.Conn, err error) {
drc.namesvrLockRW.RLock()
address := drc.namesrvAddrSelectedAddr
drc.namesvrLockRW.RUnlock()
if len(address) != 0 {
conn = drc.getConn(address)
if conn != nil {
return
}
}
defer drc.namesvrLockRW.Unlock()
drc.namesvrLockRW.Lock()
//already connected by another write lock owner
address = drc.namesrvAddrSelectedAddr
if len(address) != 0 {
conn = drc.getConn(address)
if conn != nil {
return
}
}
addressCount := len(drc.namesrvAddrList)
if drc.namesrvAddrSelectedIndex < 0 {
drc.namesrvAddrSelectedIndex = rand.Intn(addressCount)
}
for i := 1; i <= addressCount; i++ {
selectedIndex := (drc.namesrvAddrSelectedIndex + i) % addressCount
selectAddress := drc.namesrvAddrList[selectedIndex]
if len(selectAddress) == 0 {
continue
}
conn, err = drc.createConn(selectAddress)
if err == nil {
drc.namesrvAddrSelectedAddr = selectAddress
drc.namesrvAddrSelectedIndex = selectedIndex
return
}
}
err = errors.New("all namesvrAddress can't use!,address:" + drc.clientConfig.NameServerAddress)
return
}
func (drc *DefaultRemotingClient) createAndHandleTcpConn(address string) (conn net.Conn, err error) {
conn, err = net.Dial("tcp", address)
if err != nil {
glog.Error(err)
return nil, err
}
go drc.handlerReceiveLoop(conn, address) //handler连接 处理这个连接返回的结果
return
}
func (drc *DefaultRemotingClient) releaseConn(addr string, conn net.Conn) {
defer drc.connTableLock.Unlock()
conn.Close()
drc.connTableLock.Lock()
delete(drc.connTable, addr)
}
func (drc *DefaultRemotingClient) handlerReceiveLoop(conn net.Conn, addr string) (err error) {
defer func() {
//when for is break releaseConn
glog.Error(err, addr)
drc.releaseConn(addr, conn)
}()
b := make([]byte, 1024)
var length, headerLength, bodyLength int32
var buf = bytes.NewBuffer([]byte{})
var header, body []byte
var readTotalLengthFlag = true //readLen when true,read data when false
for {
var n int
n, err = conn.Read(b)
if err != nil {
return
}
_, err = buf.Write(b[:n])
if err != nil {
return
}
for {
if readTotalLengthFlag {
//we read 4 bytes of allDataLength
if buf.Len() >= 4 {
err = binary.Read(buf, binary.BigEndian, &length)
if err != nil {
return
}
readTotalLengthFlag = false //now turn to read data
} else {
break //wait bytes we not got
}
}
if !readTotalLengthFlag {
if buf.Len() < int(length) {
// judge all data received.if not,loop to wait
break
}
}
//now all data received, we can read totalLen again
readTotalLengthFlag = true
//get the data,and handler it
//header len
err = binary.Read(buf, binary.BigEndian, &headerLength)
var realHeaderLen = (headerLength & 0x00ffffff)
//headerData the first ff is about serializable type
var headerSerializableType = byte(headerLength >> 24)
header = make([]byte, realHeaderLen)
_, err = buf.Read(header)
bodyLength = length - 4 - realHeaderLen
body = make([]byte, int(bodyLength))
if bodyLength == 0 {
// no body
} else {
_, err = buf.Read(body)
}
go drc.handlerReceivedMessage(conn, headerSerializableType, header, body)
}
}
}
func (drc *DefaultRemotingClient) handlerReceivedMessage(conn net.Conn, headerSerializableType byte, headBytes []byte, bodyBytes []byte) {
cmd := drc.serializerHandler.decodeRemoteCommand(headerSerializableType, headBytes, bodyBytes)
if cmd.isResponseType() {
drc.handlerResponse(cmd)
return
}
go drc.handlerRequest(conn, cmd)
}
func (drc *DefaultRemotingClient) handlerRequest(conn net.Conn, cmd *RemotingCommand) {
responseCommand := drc.clientRequestProcessor(cmd)
if responseCommand == nil {
return
}
responseCommand.Opaque = cmd.Opaque
responseCommand.markResponseType()
header := drc.serializerHandler.encodeHeader(responseCommand)
body := responseCommand.Body
err := drc.sendRequest(header, body, conn, "")
if err != nil {
glog.Error(err)
}
}
func (drc *DefaultRemotingClient) handlerResponse(cmd *RemotingCommand) {
response, err := drc.getResponse(cmd.Opaque)
drc.removeResponse(cmd.Opaque)
if err != nil {
return
}
response.ResponseCommand = cmd
if response.InvokeCallback != nil {
response.InvokeCallback(response)
}
if response.Done != nil {
response.Done <- true
}
}
//ClearExpireResponse clear expire response which is not consumed after 30 seconds
func (drc *DefaultRemotingClient) ClearExpireResponse() {
for seq, responseObj := range drc.responseTable.Items() {
response := responseObj.(*ResponseFuture)
if (response.BeginTimestamp + 30) <= time.Now().Unix() {
drc.responseTable.Remove(seq)
if response.InvokeCallback != nil {
response.InvokeCallback(nil)
glog.Warningf("remove time out request %v", response)
}
}
}
}