blob: 26c3beab91501d7f1f545566d4d9974957887877 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package remote
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"io"
"net"
"sync"
"time"
"github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
)
type ClientRequestFunc func(*RemotingCommand, net.Addr) *RemotingCommand
type TcpOption struct {
// TODO
}
//go:generate mockgen -source remote_client.go -destination mock_remote_client.go -self_package github.com/apache/rocketmq-client-go/internal/remote --package remote RemotingClient
type RemotingClient interface {
RegisterRequestFunc(code int16, f ClientRequestFunc)
RegisterInterceptor(interceptors ...primitive.Interceptor)
InvokeSync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error)
InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error
InvokeOneWay(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) error
ShutDown()
}
var _ RemotingClient = &remotingClient{}
type remotingClient struct {
responseTable sync.Map
connectionTable sync.Map
option TcpOption
processors map[int16]ClientRequestFunc
connectionLocker sync.Mutex
interceptor primitive.Interceptor
}
func NewRemotingClient() *remotingClient {
return &remotingClient{
processors: make(map[int16]ClientRequestFunc),
}
}
func (c *remotingClient) RegisterRequestFunc(code int16, f ClientRequestFunc) {
c.processors[code] = f
}
// TODO: merge sync and async model. sync should run on async model by blocking on chan
func (c *remotingClient) InvokeSync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error) {
conn, err := c.connect(ctx, addr)
if err != nil {
return nil, err
}
resp := NewResponseFuture(ctx, request.Opaque, timeout, nil)
c.responseTable.Store(resp.Opaque, resp)
defer c.responseTable.Delete(request.Opaque)
err = c.sendRequest(conn, request)
if err != nil {
return nil, err
}
resp.SendRequestOK = true
return resp.waitResponse()
}
// InvokeAsync send request without blocking, just return immediately.
func (c *remotingClient) InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error {
conn, err := c.connect(ctx, addr)
if err != nil {
return err
}
resp := NewResponseFuture(ctx, request.Opaque, timeout, callback)
c.responseTable.Store(resp.Opaque, resp)
err = c.sendRequest(conn, request)
if err != nil {
return err
}
resp.SendRequestOK = true
go c.receiveAsync(resp)
return nil
}
func (c *remotingClient) receiveAsync(f *ResponseFuture) {
_, err := f.waitResponse()
if err != nil {
f.executeInvokeCallback()
}
}
func (c *remotingClient) InvokeOneWay(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) error {
conn, err := c.connect(ctx, addr)
if err != nil {
return err
}
return c.sendRequest(conn, request)
}
func (c *remotingClient) connect(ctx context.Context, addr string) (*tcpConnWrapper, error) {
//it needs additional locker.
c.connectionLocker.Lock()
defer c.connectionLocker.Unlock()
conn, ok := c.connectionTable.Load(addr)
if ok {
return conn.(*tcpConnWrapper), nil
}
tcpConn, err := initConn(ctx, addr)
if err != nil {
return nil, err
}
c.connectionTable.Store(addr, tcpConn)
go c.receiveResponse(tcpConn)
return tcpConn, nil
}
func (c *remotingClient) receiveResponse(r *tcpConnWrapper) {
var err error
header := make([]byte, 4)
defer c.closeConnection(r)
for {
if err != nil {
if r.isClosed(err) {
return
}
rlog.Error("conn error, close connection", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
break
}
_, err = io.ReadFull(r, header)
if err != nil {
if r.isClosed(err) {
return
}
rlog.Error("io ReadFull error when read header", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
continue
}
var length int32
err = binary.Read(bytes.NewReader(header), binary.BigEndian, &length)
if err != nil {
rlog.Error("binary decode header error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
continue
}
buf := make([]byte, length)
_, err = io.ReadFull(r, buf)
if err != nil {
if r.isClosed(err) {
return
}
rlog.Error("io ReadFull error when read payload", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
continue
}
cmd, err := decode(buf)
if err != nil {
rlog.Error("decode RemotingCommand error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
continue
}
c.processCMD(cmd, r)
}
}
func (c *remotingClient) processCMD(cmd *RemotingCommand, r *tcpConnWrapper) {
if cmd.isResponseType() {
resp, exist := c.responseTable.Load(cmd.Opaque)
if exist {
c.responseTable.Delete(cmd.Opaque)
responseFuture := resp.(*ResponseFuture)
go func() {
responseFuture.ResponseCommand = cmd
responseFuture.executeInvokeCallback()
if responseFuture.Done != nil {
responseFuture.Done <- true
}
}()
}
} else {
f := c.processors[cmd.Code]
if f != nil {
// single goroutine will be deadlock
// TODO: optimize with goroutine pool, https://github.com/apache/rocketmq-client-go/issues/307
go func() {
res := f(cmd, r.RemoteAddr())
if res != nil {
res.Opaque = cmd.Opaque
res.Flag |= 1 << 0
err := c.sendRequest(r, res)
if err != nil {
rlog.Warning("send response to broker error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
"responseCode": res.Code,
})
}
}
}()
} else {
rlog.Warning("receive broker's requests, but no func to handle", map[string]interface{}{
"responseCode": cmd.Code,
})
}
}
}
func (c *remotingClient) createScanner(r io.Reader) *bufio.Scanner {
scanner := bufio.NewScanner(r)
// max batch size: 32, max message size: 4Mb
scanner.Buffer(make([]byte, 1024*1024), 128*1024*1024)
scanner.Split(func(data []byte, atEOF bool) (int, []byte, error) {
defer func() {
if err := recover(); err != nil {
rlog.Error("scanner split panic", map[string]interface{}{
"panic": err,
})
}
}()
if !atEOF {
if len(data) >= 4 {
var length int32
err := binary.Read(bytes.NewReader(data[0:4]), binary.BigEndian, &length)
if err != nil {
rlog.Error("split data error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return 0, nil, err
}
if int(length)+4 <= len(data) {
return int(length) + 4, data[4 : length+4], nil
}
}
}
return 0, nil, nil
})
return scanner
}
func (c *remotingClient) sendRequest(conn *tcpConnWrapper, request *RemotingCommand) error {
var err error
if c.interceptor != nil {
err = c.interceptor(context.Background(), request, nil, func(ctx context.Context, req, reply interface{}) error {
return c.doRequest(conn, request)
})
} else {
err = c.doRequest(conn, request)
}
return err
}
func (c *remotingClient) doRequest(conn *tcpConnWrapper, request *RemotingCommand) error {
content, err := encode(request)
if err != nil {
return err
}
_, err = conn.Write(content)
if err != nil {
c.closeConnection(conn)
return err
}
return nil
}
func (c *remotingClient) closeConnection(toCloseConn *tcpConnWrapper) {
c.connectionTable.Range(func(key, value interface{}) bool {
if value == toCloseConn {
c.connectionTable.Delete(key)
return false
} else {
return true
}
})
}
func (c *remotingClient) ShutDown() {
c.responseTable.Range(func(key, value interface{}) bool {
c.responseTable.Delete(key)
return true
})
c.connectionTable.Range(func(key, value interface{}) bool {
conn := value.(*tcpConnWrapper)
err := conn.destroy()
if err != nil {
rlog.Warning("close remoting conn error", map[string]interface{}{
"remote": conn.RemoteAddr(),
rlog.LogKeyUnderlayError: err,
})
} else {
rlog.Info("remoting conn closed", map[string]interface{}{
"remote": conn.RemoteAddr(),
})
}
return true
})
}
func (c *remotingClient) RegisterInterceptor(interceptors ...primitive.Interceptor) {
c.interceptor = primitive.ChainInterceptors(interceptors...)
}