[ISSUE #344] feat(client): advance timeout (#351)
* feat(client): advance timeout
- use ctx timeout for remoting Client
- fix test
diff --git a/internal/client.go b/internal/client.go
index e566797..50fba22 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -134,7 +134,7 @@
InvokeSync(ctx context.Context, addr string, request *remote.RemotingCommand,
timeoutMillis time.Duration) (*remote.RemotingCommand, error)
InvokeAsync(ctx context.Context, addr string, request *remote.RemotingCommand,
- timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error
+ f func(*remote.RemotingCommand, error)) error
InvokeOneWay(ctx context.Context, addr string, request *remote.RemotingCommand,
timeoutMillis time.Duration) error
CheckClientInBroker()
@@ -383,15 +383,16 @@
if c.close {
return nil, ErrServiceState
}
- return c.remoteClient.InvokeSync(ctx, addr, request, timeoutMillis)
+ ctx, _ = context.WithTimeout(ctx, timeoutMillis)
+ return c.remoteClient.InvokeSync(ctx, addr, request)
}
func (c *rmqClient) InvokeAsync(ctx context.Context, addr string, request *remote.RemotingCommand,
- timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error {
+ f func(*remote.RemotingCommand, error)) error {
if c.close {
return ErrServiceState
}
- return c.remoteClient.InvokeAsync(ctx, addr, request, timeoutMillis, func(future *remote.ResponseFuture) {
+ return c.remoteClient.InvokeAsync(ctx, addr, request, func(future *remote.ResponseFuture) {
f(future.ResponseCommand, future.Err)
})
@@ -402,7 +403,7 @@
if c.close {
return ErrServiceState
}
- return c.remoteClient.InvokeOneWay(ctx, addr, request, timeoutMillis)
+ return c.remoteClient.InvokeOneWay(ctx, addr, request)
}
func (c *rmqClient) CheckClientInBroker() {
@@ -444,7 +445,9 @@
data := value.(*BrokerData)
for id, addr := range data.BrokerAddresses {
cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())
- response, err := c.remoteClient.InvokeSync(context.Background(), addr, cmd, 3*time.Second)
+
+ ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
+ response, err := c.remoteClient.InvokeSync(ctx, addr, cmd)
if err != nil {
rlog.Warning("send heart beat to broker error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
@@ -544,7 +547,8 @@
// PullMessage with sync
func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader) (*primitive.PullResult, error) {
cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
- res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd, 30*time.Second)
+ ctx, _ = context.WithTimeout(ctx, 30*time.Second)
+ res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd)
if err != nil {
return nil, err
}
diff --git a/internal/mock_client.go b/internal/mock_client.go
index 244cd0c..d01a51f 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -191,6 +191,18 @@
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnitMode", reflect.TypeOf((*MockInnerConsumer)(nil).IsUnitMode))
}
+// GetConsumerRunningInfo mocks base method
+func (m *MockInnerConsumer) GetConsumerRunningInfo() *ConsumerRunningInfo {
+ ret := m.ctrl.Call(m, "GetConsumerRunningInfo")
+ ret0, _ := ret[0].(*ConsumerRunningInfo)
+ return ret0
+}
+
+// GetConsumerRunningInfo indicates an expected call of GetConsumerRunningInfo
+func (mr *MockInnerConsumerMockRecorder) GetConsumerRunningInfo() *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConsumerRunningInfo", reflect.TypeOf((*MockInnerConsumer)(nil).GetConsumerRunningInfo))
+}
+
// MockRMQClient is a mock of RMQClient interface
type MockRMQClient struct {
ctrl *gomock.Controller
@@ -270,15 +282,15 @@
}
// InvokeAsync mocks base method
-func (m *MockRMQClient) InvokeAsync(ctx context.Context, addr string, request *remote.RemotingCommand, timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error {
- ret := m.ctrl.Call(m, "InvokeAsync", ctx, addr, request, timeoutMillis, f)
+func (m *MockRMQClient) InvokeAsync(ctx context.Context, addr string, request *remote.RemotingCommand, f func(*remote.RemotingCommand, error)) error {
+ ret := m.ctrl.Call(m, "InvokeAsync", ctx, addr, request, f)
ret0, _ := ret[0].(error)
return ret0
}
// InvokeAsync indicates an expected call of InvokeAsync
-func (mr *MockRMQClientMockRecorder) InvokeAsync(ctx, addr, request, timeoutMillis, f interface{}) *gomock.Call {
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", reflect.TypeOf((*MockRMQClient)(nil).InvokeAsync), ctx, addr, request, timeoutMillis, f)
+func (mr *MockRMQClientMockRecorder) InvokeAsync(ctx, addr, request, f interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", reflect.TypeOf((*MockRMQClient)(nil).InvokeAsync), ctx, addr, request, f)
}
// InvokeOneWay mocks base method
@@ -399,10 +411,10 @@
// UpdatePublishInfo mocks base method
func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData, changed bool) {
- m.ctrl.Call(m, "UpdatePublishInfo", topic, data)
+ m.ctrl.Call(m, "UpdatePublishInfo", topic, data, changed)
}
// UpdatePublishInfo indicates an expected call of UpdatePublishInfo
-func (mr *MockRMQClientMockRecorder) UpdatePublishInfo(topic, data interface{}) *gomock.Call {
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdatePublishInfo", reflect.TypeOf((*MockRMQClient)(nil).UpdatePublishInfo), topic, data)
+func (mr *MockRMQClientMockRecorder) UpdatePublishInfo(topic, data, changed interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdatePublishInfo", reflect.TypeOf((*MockRMQClient)(nil).UpdatePublishInfo), topic, data, changed)
}
diff --git a/internal/mock_namesrv.go b/internal/mock_namesrv.go
index 0a983bd..65234e1 100644
--- a/internal/mock_namesrv.go
+++ b/internal/mock_namesrv.go
@@ -14,7 +14,6 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
-
// Code generated by MockGen. DO NOT EDIT.
// Source: namesrv.go
@@ -22,9 +21,11 @@
package internal
import (
- primitive "github.com/apache/rocketmq-client-go/primitive"
- gomock "github.com/golang/mock/gomock"
- reflect "reflect"
+ "reflect"
+
+ "github.com/golang/mock/gomock"
+
+ "github.com/apache/rocketmq-client-go/primitive"
)
// MockNamesrvs is a mock of Namesrvs interface
@@ -50,59 +51,51 @@
return m.recorder
}
+// UpdateNameServerAddress mocks base method
+func (m *MockNamesrvs) UpdateNameServerAddress(nameServerDomain, instanceName string) {
+ m.ctrl.Call(m, "UpdateNameServerAddress", nameServerDomain, instanceName)
+}
+
+// UpdateNameServerAddress indicates an expected call of UpdateNameServerAddress
+func (mr *MockNamesrvsMockRecorder) UpdateNameServerAddress(nameServerDomain, instanceName interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateNameServerAddress", reflect.TypeOf((*MockNamesrvs)(nil).UpdateNameServerAddress), nameServerDomain, instanceName)
+}
+
// AddBroker mocks base method
func (m *MockNamesrvs) AddBroker(routeData *TopicRouteData) {
- m.ctrl.T.Helper()
m.ctrl.Call(m, "AddBroker", routeData)
}
// AddBroker indicates an expected call of AddBroker
func (mr *MockNamesrvsMockRecorder) AddBroker(routeData interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBroker", reflect.TypeOf((*MockNamesrvs)(nil).AddBroker), routeData)
}
// cleanOfflineBroker mocks base method
func (m *MockNamesrvs) cleanOfflineBroker() {
- m.ctrl.T.Helper()
m.ctrl.Call(m, "cleanOfflineBroker")
}
// cleanOfflineBroker indicates an expected call of cleanOfflineBroker
func (mr *MockNamesrvsMockRecorder) cleanOfflineBroker() *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "cleanOfflineBroker", reflect.TypeOf((*MockNamesrvs)(nil).cleanOfflineBroker))
}
-// UpdateNameServerAddress mocks base method
-func (m *MockNamesrvs) UpdateNameServerAddress(nameServer, instanceName string) {
- m.ctrl.T.Helper()
- m.ctrl.Call(m, "UpdateNameServerAddress", nameServer, instanceName)
-}
-
-// UpdateNameServerAddress indicates an expected call of UpdateNameServerAddress
-func (mr *MockNamesrvsMockRecorder) UpdateNameServerAddress(nameServer, instanceName string) *gomock.Call {
- mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateNameServerAddress", reflect.TypeOf((*MockNamesrvs)(nil).UpdateNameServerAddress), nameServer, instanceName)
-}
-
// UpdateTopicRouteInfo mocks base method
-func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) (routeData *TopicRouteData, changed bool) {
- m.ctrl.T.Helper()
+func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
ret := m.ctrl.Call(m, "UpdateTopicRouteInfo", topic)
ret0, _ := ret[0].(*TopicRouteData)
- return ret0, changed
+ ret1, _ := ret[1].(bool)
+ return ret0, ret1
}
// UpdateTopicRouteInfo indicates an expected call of UpdateTopicRouteInfo
func (mr *MockNamesrvsMockRecorder) UpdateTopicRouteInfo(topic interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTopicRouteInfo", reflect.TypeOf((*MockNamesrvs)(nil).UpdateTopicRouteInfo), topic)
}
// FetchPublishMessageQueues mocks base method
func (m *MockNamesrvs) FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error) {
- m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchPublishMessageQueues", topic)
ret0, _ := ret[0].([]*primitive.MessageQueue)
ret1, _ := ret[1].(error)
@@ -111,13 +104,11 @@
// FetchPublishMessageQueues indicates an expected call of FetchPublishMessageQueues
func (mr *MockNamesrvsMockRecorder) FetchPublishMessageQueues(topic interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchPublishMessageQueues", reflect.TypeOf((*MockNamesrvs)(nil).FetchPublishMessageQueues), topic)
}
// FindBrokerAddrByTopic mocks base method
func (m *MockNamesrvs) FindBrokerAddrByTopic(topic string) string {
- m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FindBrokerAddrByTopic", topic)
ret0, _ := ret[0].(string)
return ret0
@@ -125,13 +116,11 @@
// FindBrokerAddrByTopic indicates an expected call of FindBrokerAddrByTopic
func (mr *MockNamesrvsMockRecorder) FindBrokerAddrByTopic(topic interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindBrokerAddrByTopic", reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddrByTopic), topic)
}
// FindBrokerAddrByName mocks base method
func (m *MockNamesrvs) FindBrokerAddrByName(brokerName string) string {
- m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FindBrokerAddrByName", brokerName)
ret0, _ := ret[0].(string)
return ret0
@@ -139,13 +128,11 @@
// FindBrokerAddrByName indicates an expected call of FindBrokerAddrByName
func (mr *MockNamesrvsMockRecorder) FindBrokerAddrByName(brokerName interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindBrokerAddrByName", reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddrByName), brokerName)
}
// FindBrokerAddressInSubscribe mocks base method
func (m *MockNamesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult {
- m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FindBrokerAddressInSubscribe", brokerName, brokerId, onlyThisBroker)
ret0, _ := ret[0].(*FindBrokerResult)
return ret0
@@ -153,13 +140,11 @@
// FindBrokerAddressInSubscribe indicates an expected call of FindBrokerAddressInSubscribe
func (mr *MockNamesrvsMockRecorder) FindBrokerAddressInSubscribe(brokerName, brokerId, onlyThisBroker interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindBrokerAddressInSubscribe", reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddressInSubscribe), brokerName, brokerId, onlyThisBroker)
}
// FetchSubscribeMessageQueues mocks base method
func (m *MockNamesrvs) FetchSubscribeMessageQueues(topic string) ([]*primitive.MessageQueue, error) {
- m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchSubscribeMessageQueues", topic)
ret0, _ := ret[0].([]*primitive.MessageQueue)
ret1, _ := ret[1].(error)
@@ -168,13 +153,11 @@
// FetchSubscribeMessageQueues indicates an expected call of FetchSubscribeMessageQueues
func (mr *MockNamesrvsMockRecorder) FetchSubscribeMessageQueues(topic interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchSubscribeMessageQueues", reflect.TypeOf((*MockNamesrvs)(nil).FetchSubscribeMessageQueues), topic)
}
// AddrList mocks base method
func (m *MockNamesrvs) AddrList() []string {
- m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AddrList")
ret0, _ := ret[0].([]string)
return ret0
@@ -182,6 +165,5 @@
// AddrList indicates an expected call of AddrList
func (mr *MockNamesrvsMockRecorder) AddrList() *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddrList", reflect.TypeOf((*MockNamesrvs)(nil).AddrList))
}
diff --git a/internal/remote/future.go b/internal/remote/future.go
index 4d3008f..5a83d6f 100644
--- a/internal/remote/future.go
+++ b/internal/remote/future.go
@@ -31,7 +31,6 @@
SendRequestOK bool
Err error
Opaque int32
- Timeout time.Duration
callback func(*ResponseFuture)
BeginTimestamp time.Duration
Done chan bool
@@ -40,11 +39,10 @@
}
// NewResponseFuture create ResponseFuture with opaque, timeout and callback
-func NewResponseFuture(ctx context.Context, opaque int32, timeout time.Duration, callback func(*ResponseFuture)) *ResponseFuture {
+func NewResponseFuture(ctx context.Context, opaque int32, callback func(*ResponseFuture)) *ResponseFuture {
return &ResponseFuture{
Opaque: opaque,
Done: make(chan bool),
- Timeout: timeout,
callback: callback,
BeginTimestamp: time.Duration(time.Now().Unix()) * time.Second,
ctx: ctx,
@@ -59,24 +57,17 @@
})
}
-func (r *ResponseFuture) isTimeout() bool {
- elapse := time.Duration(time.Now().Unix())*time.Second - r.BeginTimestamp
- return elapse > r.Timeout
-}
-
func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
var (
cmd *RemotingCommand
err error
)
- ctx, cancel := context.WithTimeout(r.ctx, r.Timeout)
- defer cancel()
for {
select {
case <-r.Done:
cmd, err = r.ResponseCommand, r.Err
goto done
- case <-ctx.Done():
+ case <-r.ctx.Done():
err = utils.ErrRequestTimeout
r.Err = err
goto done
diff --git a/internal/remote/mock_remote_client.go b/internal/remote/mock_remote_client.go
index e264ab2..d5b86a5 100644
--- a/internal/remote/mock_remote_client.go
+++ b/internal/remote/mock_remote_client.go
@@ -1,19 +1,19 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
// Code generated by MockGen. DO NOT EDIT.
// Source: remote_client.go
@@ -25,7 +25,6 @@
primitive "github.com/apache/rocketmq-client-go/primitive"
gomock "github.com/golang/mock/gomock"
reflect "reflect"
- time "time"
)
// MockRemotingClient is a mock of RemotingClient interface
@@ -76,40 +75,40 @@
}
// InvokeSync mocks base method
-func (m *MockRemotingClient) InvokeSync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error) {
- ret := m.ctrl.Call(m, "InvokeSync", ctx, addr, request, timeout)
+func (m *MockRemotingClient) InvokeSync(ctx context.Context, addr string, request *RemotingCommand) (*RemotingCommand, error) {
+ ret := m.ctrl.Call(m, "InvokeSync", ctx, addr, request)
ret0, _ := ret[0].(*RemotingCommand)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// InvokeSync indicates an expected call of InvokeSync
-func (mr *MockRemotingClientMockRecorder) InvokeSync(ctx, addr, request, timeout interface{}) *gomock.Call {
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeSync", reflect.TypeOf((*MockRemotingClient)(nil).InvokeSync), ctx, addr, request, timeout)
+func (mr *MockRemotingClientMockRecorder) InvokeSync(ctx, addr, request interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeSync", reflect.TypeOf((*MockRemotingClient)(nil).InvokeSync), ctx, addr, request)
}
// InvokeAsync mocks base method
-func (m *MockRemotingClient) InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error {
- ret := m.ctrl.Call(m, "InvokeAsync", ctx, addr, request, timeout, callback)
+func (m *MockRemotingClient) InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, callback func(*ResponseFuture)) error {
+ ret := m.ctrl.Call(m, "InvokeAsync", ctx, addr, request, callback)
ret0, _ := ret[0].(error)
return ret0
}
// InvokeAsync indicates an expected call of InvokeAsync
-func (mr *MockRemotingClientMockRecorder) InvokeAsync(ctx, addr, request, timeout, callback interface{}) *gomock.Call {
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", reflect.TypeOf((*MockRemotingClient)(nil).InvokeAsync), ctx, addr, request, timeout, callback)
+func (mr *MockRemotingClientMockRecorder) InvokeAsync(ctx, addr, request, callback interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", reflect.TypeOf((*MockRemotingClient)(nil).InvokeAsync), ctx, addr, request, callback)
}
// InvokeOneWay mocks base method
-func (m *MockRemotingClient) InvokeOneWay(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) error {
- ret := m.ctrl.Call(m, "InvokeOneWay", ctx, addr, request, timeout)
+func (m *MockRemotingClient) InvokeOneWay(ctx context.Context, addr string, request *RemotingCommand) error {
+ ret := m.ctrl.Call(m, "InvokeOneWay", ctx, addr, request)
ret0, _ := ret[0].(error)
return ret0
}
// InvokeOneWay indicates an expected call of InvokeOneWay
-func (mr *MockRemotingClientMockRecorder) InvokeOneWay(ctx, addr, request, timeout interface{}) *gomock.Call {
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeOneWay", reflect.TypeOf((*MockRemotingClient)(nil).InvokeOneWay), ctx, addr, request, timeout)
+func (mr *MockRemotingClientMockRecorder) InvokeOneWay(ctx, addr, request interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeOneWay", reflect.TypeOf((*MockRemotingClient)(nil).InvokeOneWay), ctx, addr, request)
}
// ShutDown mocks base method
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 26c3bea..ce35c6c 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -24,7 +24,6 @@
"io"
"net"
"sync"
- "time"
"github.com/apache/rocketmq-client-go/primitive"
@@ -41,9 +40,9 @@
type RemotingClient interface {
RegisterRequestFunc(code int16, f ClientRequestFunc)
RegisterInterceptor(interceptors ...primitive.Interceptor)
- InvokeSync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error)
- InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error
- InvokeOneWay(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) error
+ InvokeSync(ctx context.Context, addr string, request *RemotingCommand) (*RemotingCommand, error)
+ InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, callback func(*ResponseFuture)) error
+ InvokeOneWay(ctx context.Context, addr string, request *RemotingCommand) error
ShutDown()
}
@@ -69,12 +68,12 @@
}
// TODO: merge sync and async model. sync should run on async model by blocking on chan
-func (c *remotingClient) InvokeSync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error) {
+func (c *remotingClient) InvokeSync(ctx context.Context, addr string, request *RemotingCommand) (*RemotingCommand, error) {
conn, err := c.connect(ctx, addr)
if err != nil {
return nil, err
}
- resp := NewResponseFuture(ctx, request.Opaque, timeout, nil)
+ resp := NewResponseFuture(ctx, request.Opaque, nil)
c.responseTable.Store(resp.Opaque, resp)
defer c.responseTable.Delete(request.Opaque)
err = c.sendRequest(conn, request)
@@ -86,12 +85,12 @@
}
// InvokeAsync send request without blocking, just return immediately.
-func (c *remotingClient) InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error {
+func (c *remotingClient) InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, callback func(*ResponseFuture)) error {
conn, err := c.connect(ctx, addr)
if err != nil {
return err
}
- resp := NewResponseFuture(ctx, request.Opaque, timeout, callback)
+ resp := NewResponseFuture(ctx, request.Opaque, callback)
c.responseTable.Store(resp.Opaque, resp)
err = c.sendRequest(conn, request)
if err != nil {
@@ -109,7 +108,7 @@
}
}
-func (c *remotingClient) InvokeOneWay(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) error {
+func (c *remotingClient) InvokeOneWay(ctx context.Context, addr string, request *RemotingCommand) error {
conn, err := c.connect(ctx, addr)
if err != nil {
return err
@@ -125,7 +124,6 @@
if ok {
return conn.(*tcpConnWrapper), nil
}
-
tcpConn, err := initConn(ctx, addr)
if err != nil {
return nil, err
diff --git a/internal/remote/remote_client_test.go b/internal/remote/remote_client_test.go
index 1d2c033..f3aa6a2 100644
--- a/internal/remote/remote_client_test.go
+++ b/internal/remote/remote_client_test.go
@@ -33,7 +33,7 @@
)
func TestNewResponseFuture(t *testing.T) {
- future := NewResponseFuture(context.Background(), 10, time.Duration(1000), nil)
+ future := NewResponseFuture(context.Background(), 10, nil)
if future.Opaque != 10 {
t.Errorf("wrong ResponseFuture's opaque. want=%d, got=%d", 10, future.Opaque)
}
@@ -43,10 +43,6 @@
if future.Err != nil {
t.Errorf("wrong RespnseFuture's Err. want=<nil>, got=%v", future.Err)
}
- if future.Timeout != time.Duration(1000) {
- t.Errorf("wrong ResponseFuture's TimeoutMills. want=%d, got=%d",
- future.Timeout, time.Duration(1000))
- }
if future.callback != nil {
t.Errorf("wrong ResponseFuture's callback. want=<nil>, got!=<nil>")
}
@@ -63,7 +59,7 @@
r.ResponseCommand.Remark = r.ResponseCommand.Remark + "Go Client"
}
}
- future := NewResponseFuture(context.Background(), 10, time.Duration(1000), callback)
+ future := NewResponseFuture(context.Background(), 10, callback)
future.ResponseCommand = NewRemotingCommand(200,
nil, nil)
@@ -83,22 +79,14 @@
}
-func TestResponseFutureIsTimeout(t *testing.T) {
- future := NewResponseFuture(context.Background(), 10, 500*time.Millisecond, nil)
- if future.isTimeout() != false {
- t.Errorf("wrong ResponseFuture's istimeout. want=%t, got=%t", false, future.isTimeout())
- }
- time.Sleep(time.Duration(1000) * time.Millisecond)
- assert.True(t, future.isTimeout(), "ResponseFuture's istimeout should be true")
-}
-
func TestResponseFutureWaitResponse(t *testing.T) {
- future := NewResponseFuture(context.Background(), 10, 500*time.Millisecond, nil)
+ ctx, _ := context.WithTimeout(context.Background(), time.Duration(1000))
+ future := NewResponseFuture(ctx, 10, nil)
if _, err := future.waitResponse(); err != utils.ErrRequestTimeout {
t.Errorf("wrong ResponseFuture waitResponse. want=%v, got=%v",
utils.ErrRequestTimeout, err)
}
- future = NewResponseFuture(context.Background(), 10, 500*time.Millisecond, nil)
+ future = NewResponseFuture(context.Background(), 10, nil)
responseError := errors.New("response error")
go func() {
time.Sleep(100 * time.Millisecond)
@@ -109,7 +97,7 @@
t.Errorf("wrong ResponseFuture waitResponse. want=%v. got=%v",
responseError, err)
}
- future = NewResponseFuture(context.Background(), 10, 500*time.Millisecond, nil)
+ future = NewResponseFuture(context.Background(), 10, nil)
responseRemotingCommand := NewRemotingCommand(202, nil, nil)
go func() {
time.Sleep(100 * time.Millisecond)
@@ -175,7 +163,7 @@
go func() {
clientSend.Wait()
receiveCommand, err := client.InvokeSync(context.Background(), addr,
- clientSendRemtingCommand, time.Second)
+ clientSendRemtingCommand)
if err != nil {
t.Fatalf("failed to invoke synchronous. %s", err)
} else {
@@ -238,7 +226,7 @@
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
t.Logf("[Send: %d] asychronous message", index)
sendRemotingCommand := randomNewRemotingCommand()
- err := client.InvokeAsync(context.Background(), addr, sendRemotingCommand, time.Second, func(r *ResponseFuture) {
+ err := client.InvokeAsync(context.Background(), addr, sendRemotingCommand, func(r *ResponseFuture) {
t.Logf("[Receive: %d] asychronous message response", index)
if string(sendRemotingCommand.Body) != string(r.ResponseCommand.Body) {
t.Errorf("wrong response message. want=%s, got=%s", string(sendRemotingCommand.Body),
@@ -304,8 +292,9 @@
clientSend.Add(1)
go func() {
clientSend.Wait()
- err := client.InvokeAsync(context.Background(), addr, clientSendRemtingCommand,
- time.Duration(1000), func(r *ResponseFuture) {
+ ctx, _ := context.WithTimeout(context.Background(), time.Duration(10*time.Second))
+ err := client.InvokeAsync(ctx, addr, clientSendRemtingCommand,
+ func(r *ResponseFuture) {
assert.NotNil(t, r.Err)
assert.Equal(t, utils.ErrRequestTimeout, r.Err)
wg.Done()
@@ -349,7 +338,7 @@
clientSend.Add(1)
go func() {
clientSend.Wait()
- err := client.InvokeOneWay(context.Background(), addr, clientSendRemtingCommand, 3*time.Second)
+ err := client.InvokeOneWay(context.Background(), addr, clientSendRemtingCommand)
if err != nil {
t.Fatalf("failed to invoke synchronous. %s", err)
}
diff --git a/internal/route.go b/internal/route.go
index 128ac87..f4d4116 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -333,7 +333,8 @@
)
for i := 0; i < s.Size(); i++ {
rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
- response, err = s.nameSrvClient.InvokeSync(context.Background(), s.getNameServerAddress(), rc, requestTimeout)
+ ctx, _ := context.WithTimeout(context.Background(), requestTimeout)
+ response, err = s.nameSrvClient.InvokeSync(ctx, s.getNameServerAddress(), rc)
if err == nil {
break
diff --git a/internal/route_test.go b/internal/route_test.go
index b24f312..4c5c22f 100644
--- a/internal/route_test.go
+++ b/internal/route_test.go
@@ -21,7 +21,6 @@
"context"
"sync"
"testing"
- "time"
"github.com/golang/mock/gomock"
"github.com/pkg/errors"
@@ -50,8 +49,8 @@
Convey("When marshal producer trace data", func() {
count := 0
- remotingCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
- func(ctx context.Context, addr string, request *remote.RemotingCommand, timeout time.Duration) (*remote.RemotingCommand, error) {
+ remotingCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
+ func(ctx context.Context, addr string, request *remote.RemotingCommand) (*remote.RemotingCommand, error) {
count++
if count < 3 {
return nil, errors.New("not existed")
diff --git a/internal/trace.go b/internal/trace.go
index 00074ee..212ee60 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -404,7 +404,8 @@
}
var req = td.buildSendRequest(mq, msg)
- err := td.cli.InvokeAsync(context.Background(), addr, req, 5*time.Second, func(command *remote.RemotingCommand, e error) {
+ ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
+ err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) {
if e != nil {
rlog.Error("send trace data error", map[string]interface{}{
"traceData": data,
diff --git a/producer/producer.go b/producer/producer.go
index 05bcb15..f82104f 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -241,7 +241,8 @@
return errors.Errorf("topic=%s route info not found", mq.Topic)
}
- return p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second, func(command *remote.RemotingCommand, err error) {
+ ctx, _ = context.WithTimeout(ctx, 3*time.Second)
+ return p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) {
resp := new(primitive.SendResult)
if err != nil {
h(ctx, nil, err)
diff --git a/producer/producer_test.go b/producer/producer_test.go
index f26fa25..387e9b0 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -20,7 +20,6 @@
import (
"context"
"testing"
- "time"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
@@ -190,9 +189,9 @@
mockB4Send(p)
- client.EXPECT().InvokeAsync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
+ client.EXPECT().InvokeAsync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, addr string, request *remote.RemotingCommand,
- timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error {
+ f func(*remote.RemotingCommand, error)) error {
// mock invoke callback
f(nil, nil)
return nil