[ISSUE #344] feat(client): advance timeout (#351)

* feat(client): advance timeout

- use ctx timeout for remoting Client
- fix test
diff --git a/internal/client.go b/internal/client.go
index e566797..50fba22 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -134,7 +134,7 @@
 	InvokeSync(ctx context.Context, addr string, request *remote.RemotingCommand,
 		timeoutMillis time.Duration) (*remote.RemotingCommand, error)
 	InvokeAsync(ctx context.Context, addr string, request *remote.RemotingCommand,
-		timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error
+		f func(*remote.RemotingCommand, error)) error
 	InvokeOneWay(ctx context.Context, addr string, request *remote.RemotingCommand,
 		timeoutMillis time.Duration) error
 	CheckClientInBroker()
@@ -383,15 +383,16 @@
 	if c.close {
 		return nil, ErrServiceState
 	}
-	return c.remoteClient.InvokeSync(ctx, addr, request, timeoutMillis)
+	ctx, _ = context.WithTimeout(ctx, timeoutMillis)
+	return c.remoteClient.InvokeSync(ctx, addr, request)
 }
 
 func (c *rmqClient) InvokeAsync(ctx context.Context, addr string, request *remote.RemotingCommand,
-	timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error {
+	f func(*remote.RemotingCommand, error)) error {
 	if c.close {
 		return ErrServiceState
 	}
-	return c.remoteClient.InvokeAsync(ctx, addr, request, timeoutMillis, func(future *remote.ResponseFuture) {
+	return c.remoteClient.InvokeAsync(ctx, addr, request, func(future *remote.ResponseFuture) {
 		f(future.ResponseCommand, future.Err)
 	})
 
@@ -402,7 +403,7 @@
 	if c.close {
 		return ErrServiceState
 	}
-	return c.remoteClient.InvokeOneWay(ctx, addr, request, timeoutMillis)
+	return c.remoteClient.InvokeOneWay(ctx, addr, request)
 }
 
 func (c *rmqClient) CheckClientInBroker() {
@@ -444,7 +445,9 @@
 		data := value.(*BrokerData)
 		for id, addr := range data.BrokerAddresses {
 			cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())
-			response, err := c.remoteClient.InvokeSync(context.Background(), addr, cmd, 3*time.Second)
+
+			ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
+			response, err := c.remoteClient.InvokeSync(ctx, addr, cmd)
 			if err != nil {
 				rlog.Warning("send heart beat to broker error", map[string]interface{}{
 					rlog.LogKeyUnderlayError: err,
@@ -544,7 +547,8 @@
 // PullMessage with sync
 func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader) (*primitive.PullResult, error) {
 	cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
-	res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd, 30*time.Second)
+	ctx, _ = context.WithTimeout(ctx, 30*time.Second)
+	res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd)
 	if err != nil {
 		return nil, err
 	}
diff --git a/internal/mock_client.go b/internal/mock_client.go
index 244cd0c..d01a51f 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -191,6 +191,18 @@
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnitMode", reflect.TypeOf((*MockInnerConsumer)(nil).IsUnitMode))
 }
 
+// GetConsumerRunningInfo mocks base method
+func (m *MockInnerConsumer) GetConsumerRunningInfo() *ConsumerRunningInfo {
+	ret := m.ctrl.Call(m, "GetConsumerRunningInfo")
+	ret0, _ := ret[0].(*ConsumerRunningInfo)
+	return ret0
+}
+
+// GetConsumerRunningInfo indicates an expected call of GetConsumerRunningInfo
+func (mr *MockInnerConsumerMockRecorder) GetConsumerRunningInfo() *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConsumerRunningInfo", reflect.TypeOf((*MockInnerConsumer)(nil).GetConsumerRunningInfo))
+}
+
 // MockRMQClient is a mock of RMQClient interface
 type MockRMQClient struct {
 	ctrl     *gomock.Controller
@@ -270,15 +282,15 @@
 }
 
 // InvokeAsync mocks base method
-func (m *MockRMQClient) InvokeAsync(ctx context.Context, addr string, request *remote.RemotingCommand, timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error {
-	ret := m.ctrl.Call(m, "InvokeAsync", ctx, addr, request, timeoutMillis, f)
+func (m *MockRMQClient) InvokeAsync(ctx context.Context, addr string, request *remote.RemotingCommand, f func(*remote.RemotingCommand, error)) error {
+	ret := m.ctrl.Call(m, "InvokeAsync", ctx, addr, request, f)
 	ret0, _ := ret[0].(error)
 	return ret0
 }
 
 // InvokeAsync indicates an expected call of InvokeAsync
-func (mr *MockRMQClientMockRecorder) InvokeAsync(ctx, addr, request, timeoutMillis, f interface{}) *gomock.Call {
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", reflect.TypeOf((*MockRMQClient)(nil).InvokeAsync), ctx, addr, request, timeoutMillis, f)
+func (mr *MockRMQClientMockRecorder) InvokeAsync(ctx, addr, request, f interface{}) *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", reflect.TypeOf((*MockRMQClient)(nil).InvokeAsync), ctx, addr, request, f)
 }
 
 // InvokeOneWay mocks base method
@@ -399,10 +411,10 @@
 
 // UpdatePublishInfo mocks base method
 func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData, changed bool) {
-	m.ctrl.Call(m, "UpdatePublishInfo", topic, data)
+	m.ctrl.Call(m, "UpdatePublishInfo", topic, data, changed)
 }
 
 // UpdatePublishInfo indicates an expected call of UpdatePublishInfo
-func (mr *MockRMQClientMockRecorder) UpdatePublishInfo(topic, data interface{}) *gomock.Call {
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdatePublishInfo", reflect.TypeOf((*MockRMQClient)(nil).UpdatePublishInfo), topic, data)
+func (mr *MockRMQClientMockRecorder) UpdatePublishInfo(topic, data, changed interface{}) *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdatePublishInfo", reflect.TypeOf((*MockRMQClient)(nil).UpdatePublishInfo), topic, data, changed)
 }
diff --git a/internal/mock_namesrv.go b/internal/mock_namesrv.go
index 0a983bd..65234e1 100644
--- a/internal/mock_namesrv.go
+++ b/internal/mock_namesrv.go
@@ -14,7 +14,6 @@
 See the License for the specific language governing permissions and
 limitations under the License.
 */
-
 // Code generated by MockGen. DO NOT EDIT.
 // Source: namesrv.go
 
@@ -22,9 +21,11 @@
 package internal
 
 import (
-	primitive "github.com/apache/rocketmq-client-go/primitive"
-	gomock "github.com/golang/mock/gomock"
-	reflect "reflect"
+	"reflect"
+
+	"github.com/golang/mock/gomock"
+
+	"github.com/apache/rocketmq-client-go/primitive"
 )
 
 // MockNamesrvs is a mock of Namesrvs interface
@@ -50,59 +51,51 @@
 	return m.recorder
 }
 
+// UpdateNameServerAddress mocks base method
+func (m *MockNamesrvs) UpdateNameServerAddress(nameServerDomain, instanceName string) {
+	m.ctrl.Call(m, "UpdateNameServerAddress", nameServerDomain, instanceName)
+}
+
+// UpdateNameServerAddress indicates an expected call of UpdateNameServerAddress
+func (mr *MockNamesrvsMockRecorder) UpdateNameServerAddress(nameServerDomain, instanceName interface{}) *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateNameServerAddress", reflect.TypeOf((*MockNamesrvs)(nil).UpdateNameServerAddress), nameServerDomain, instanceName)
+}
+
 // AddBroker mocks base method
 func (m *MockNamesrvs) AddBroker(routeData *TopicRouteData) {
-	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "AddBroker", routeData)
 }
 
 // AddBroker indicates an expected call of AddBroker
 func (mr *MockNamesrvsMockRecorder) AddBroker(routeData interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBroker", reflect.TypeOf((*MockNamesrvs)(nil).AddBroker), routeData)
 }
 
 // cleanOfflineBroker mocks base method
 func (m *MockNamesrvs) cleanOfflineBroker() {
-	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "cleanOfflineBroker")
 }
 
 // cleanOfflineBroker indicates an expected call of cleanOfflineBroker
 func (mr *MockNamesrvsMockRecorder) cleanOfflineBroker() *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "cleanOfflineBroker", reflect.TypeOf((*MockNamesrvs)(nil).cleanOfflineBroker))
 }
 
-// UpdateNameServerAddress mocks base method
-func (m *MockNamesrvs) UpdateNameServerAddress(nameServer, instanceName string) {
-	m.ctrl.T.Helper()
-	m.ctrl.Call(m, "UpdateNameServerAddress", nameServer, instanceName)
-}
-
-// UpdateNameServerAddress indicates an expected call of UpdateNameServerAddress
-func (mr *MockNamesrvsMockRecorder) UpdateNameServerAddress(nameServer, instanceName string) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateNameServerAddress", reflect.TypeOf((*MockNamesrvs)(nil).UpdateNameServerAddress), nameServer, instanceName)
-}
-
 // UpdateTopicRouteInfo mocks base method
-func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) (routeData *TopicRouteData, changed bool) {
-	m.ctrl.T.Helper()
+func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
 	ret := m.ctrl.Call(m, "UpdateTopicRouteInfo", topic)
 	ret0, _ := ret[0].(*TopicRouteData)
-	return ret0, changed
+	ret1, _ := ret[1].(bool)
+	return ret0, ret1
 }
 
 // UpdateTopicRouteInfo indicates an expected call of UpdateTopicRouteInfo
 func (mr *MockNamesrvsMockRecorder) UpdateTopicRouteInfo(topic interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTopicRouteInfo", reflect.TypeOf((*MockNamesrvs)(nil).UpdateTopicRouteInfo), topic)
 }
 
 // FetchPublishMessageQueues mocks base method
 func (m *MockNamesrvs) FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error) {
-	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "FetchPublishMessageQueues", topic)
 	ret0, _ := ret[0].([]*primitive.MessageQueue)
 	ret1, _ := ret[1].(error)
@@ -111,13 +104,11 @@
 
 // FetchPublishMessageQueues indicates an expected call of FetchPublishMessageQueues
 func (mr *MockNamesrvsMockRecorder) FetchPublishMessageQueues(topic interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchPublishMessageQueues", reflect.TypeOf((*MockNamesrvs)(nil).FetchPublishMessageQueues), topic)
 }
 
 // FindBrokerAddrByTopic mocks base method
 func (m *MockNamesrvs) FindBrokerAddrByTopic(topic string) string {
-	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "FindBrokerAddrByTopic", topic)
 	ret0, _ := ret[0].(string)
 	return ret0
@@ -125,13 +116,11 @@
 
 // FindBrokerAddrByTopic indicates an expected call of FindBrokerAddrByTopic
 func (mr *MockNamesrvsMockRecorder) FindBrokerAddrByTopic(topic interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindBrokerAddrByTopic", reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddrByTopic), topic)
 }
 
 // FindBrokerAddrByName mocks base method
 func (m *MockNamesrvs) FindBrokerAddrByName(brokerName string) string {
-	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "FindBrokerAddrByName", brokerName)
 	ret0, _ := ret[0].(string)
 	return ret0
@@ -139,13 +128,11 @@
 
 // FindBrokerAddrByName indicates an expected call of FindBrokerAddrByName
 func (mr *MockNamesrvsMockRecorder) FindBrokerAddrByName(brokerName interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindBrokerAddrByName", reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddrByName), brokerName)
 }
 
 // FindBrokerAddressInSubscribe mocks base method
 func (m *MockNamesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult {
-	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "FindBrokerAddressInSubscribe", brokerName, brokerId, onlyThisBroker)
 	ret0, _ := ret[0].(*FindBrokerResult)
 	return ret0
@@ -153,13 +140,11 @@
 
 // FindBrokerAddressInSubscribe indicates an expected call of FindBrokerAddressInSubscribe
 func (mr *MockNamesrvsMockRecorder) FindBrokerAddressInSubscribe(brokerName, brokerId, onlyThisBroker interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindBrokerAddressInSubscribe", reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddressInSubscribe), brokerName, brokerId, onlyThisBroker)
 }
 
 // FetchSubscribeMessageQueues mocks base method
 func (m *MockNamesrvs) FetchSubscribeMessageQueues(topic string) ([]*primitive.MessageQueue, error) {
-	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "FetchSubscribeMessageQueues", topic)
 	ret0, _ := ret[0].([]*primitive.MessageQueue)
 	ret1, _ := ret[1].(error)
@@ -168,13 +153,11 @@
 
 // FetchSubscribeMessageQueues indicates an expected call of FetchSubscribeMessageQueues
 func (mr *MockNamesrvsMockRecorder) FetchSubscribeMessageQueues(topic interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchSubscribeMessageQueues", reflect.TypeOf((*MockNamesrvs)(nil).FetchSubscribeMessageQueues), topic)
 }
 
 // AddrList mocks base method
 func (m *MockNamesrvs) AddrList() []string {
-	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "AddrList")
 	ret0, _ := ret[0].([]string)
 	return ret0
@@ -182,6 +165,5 @@
 
 // AddrList indicates an expected call of AddrList
 func (mr *MockNamesrvsMockRecorder) AddrList() *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddrList", reflect.TypeOf((*MockNamesrvs)(nil).AddrList))
 }
diff --git a/internal/remote/future.go b/internal/remote/future.go
index 4d3008f..5a83d6f 100644
--- a/internal/remote/future.go
+++ b/internal/remote/future.go
@@ -31,7 +31,6 @@
 	SendRequestOK   bool
 	Err             error
 	Opaque          int32
-	Timeout         time.Duration
 	callback        func(*ResponseFuture)
 	BeginTimestamp  time.Duration
 	Done            chan bool
@@ -40,11 +39,10 @@
 }
 
 // NewResponseFuture create ResponseFuture with opaque, timeout and callback
-func NewResponseFuture(ctx context.Context, opaque int32, timeout time.Duration, callback func(*ResponseFuture)) *ResponseFuture {
+func NewResponseFuture(ctx context.Context, opaque int32, callback func(*ResponseFuture)) *ResponseFuture {
 	return &ResponseFuture{
 		Opaque:         opaque,
 		Done:           make(chan bool),
-		Timeout:        timeout,
 		callback:       callback,
 		BeginTimestamp: time.Duration(time.Now().Unix()) * time.Second,
 		ctx:            ctx,
@@ -59,24 +57,17 @@
 	})
 }
 
-func (r *ResponseFuture) isTimeout() bool {
-	elapse := time.Duration(time.Now().Unix())*time.Second - r.BeginTimestamp
-	return elapse > r.Timeout
-}
-
 func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
 	var (
 		cmd *RemotingCommand
 		err error
 	)
-	ctx, cancel := context.WithTimeout(r.ctx, r.Timeout)
-	defer cancel()
 	for {
 		select {
 		case <-r.Done:
 			cmd, err = r.ResponseCommand, r.Err
 			goto done
-		case <-ctx.Done():
+		case <-r.ctx.Done():
 			err = utils.ErrRequestTimeout
 			r.Err = err
 			goto done
diff --git a/internal/remote/mock_remote_client.go b/internal/remote/mock_remote_client.go
index e264ab2..d5b86a5 100644
--- a/internal/remote/mock_remote_client.go
+++ b/internal/remote/mock_remote_client.go
@@ -1,19 +1,19 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
 // Code generated by MockGen. DO NOT EDIT.
 // Source: remote_client.go
 
@@ -25,7 +25,6 @@
 	primitive "github.com/apache/rocketmq-client-go/primitive"
 	gomock "github.com/golang/mock/gomock"
 	reflect "reflect"
-	time "time"
 )
 
 // MockRemotingClient is a mock of RemotingClient interface
@@ -76,40 +75,40 @@
 }
 
 // InvokeSync mocks base method
-func (m *MockRemotingClient) InvokeSync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error) {
-	ret := m.ctrl.Call(m, "InvokeSync", ctx, addr, request, timeout)
+func (m *MockRemotingClient) InvokeSync(ctx context.Context, addr string, request *RemotingCommand) (*RemotingCommand, error) {
+	ret := m.ctrl.Call(m, "InvokeSync", ctx, addr, request)
 	ret0, _ := ret[0].(*RemotingCommand)
 	ret1, _ := ret[1].(error)
 	return ret0, ret1
 }
 
 // InvokeSync indicates an expected call of InvokeSync
-func (mr *MockRemotingClientMockRecorder) InvokeSync(ctx, addr, request, timeout interface{}) *gomock.Call {
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeSync", reflect.TypeOf((*MockRemotingClient)(nil).InvokeSync), ctx, addr, request, timeout)
+func (mr *MockRemotingClientMockRecorder) InvokeSync(ctx, addr, request interface{}) *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeSync", reflect.TypeOf((*MockRemotingClient)(nil).InvokeSync), ctx, addr, request)
 }
 
 // InvokeAsync mocks base method
-func (m *MockRemotingClient) InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error {
-	ret := m.ctrl.Call(m, "InvokeAsync", ctx, addr, request, timeout, callback)
+func (m *MockRemotingClient) InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, callback func(*ResponseFuture)) error {
+	ret := m.ctrl.Call(m, "InvokeAsync", ctx, addr, request, callback)
 	ret0, _ := ret[0].(error)
 	return ret0
 }
 
 // InvokeAsync indicates an expected call of InvokeAsync
-func (mr *MockRemotingClientMockRecorder) InvokeAsync(ctx, addr, request, timeout, callback interface{}) *gomock.Call {
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", reflect.TypeOf((*MockRemotingClient)(nil).InvokeAsync), ctx, addr, request, timeout, callback)
+func (mr *MockRemotingClientMockRecorder) InvokeAsync(ctx, addr, request, callback interface{}) *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", reflect.TypeOf((*MockRemotingClient)(nil).InvokeAsync), ctx, addr, request, callback)
 }
 
 // InvokeOneWay mocks base method
-func (m *MockRemotingClient) InvokeOneWay(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) error {
-	ret := m.ctrl.Call(m, "InvokeOneWay", ctx, addr, request, timeout)
+func (m *MockRemotingClient) InvokeOneWay(ctx context.Context, addr string, request *RemotingCommand) error {
+	ret := m.ctrl.Call(m, "InvokeOneWay", ctx, addr, request)
 	ret0, _ := ret[0].(error)
 	return ret0
 }
 
 // InvokeOneWay indicates an expected call of InvokeOneWay
-func (mr *MockRemotingClientMockRecorder) InvokeOneWay(ctx, addr, request, timeout interface{}) *gomock.Call {
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeOneWay", reflect.TypeOf((*MockRemotingClient)(nil).InvokeOneWay), ctx, addr, request, timeout)
+func (mr *MockRemotingClientMockRecorder) InvokeOneWay(ctx, addr, request interface{}) *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeOneWay", reflect.TypeOf((*MockRemotingClient)(nil).InvokeOneWay), ctx, addr, request)
 }
 
 // ShutDown mocks base method
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 26c3bea..ce35c6c 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -24,7 +24,6 @@
 	"io"
 	"net"
 	"sync"
-	"time"
 
 	"github.com/apache/rocketmq-client-go/primitive"
 
@@ -41,9 +40,9 @@
 type RemotingClient interface {
 	RegisterRequestFunc(code int16, f ClientRequestFunc)
 	RegisterInterceptor(interceptors ...primitive.Interceptor)
-	InvokeSync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error)
-	InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error
-	InvokeOneWay(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) error
+	InvokeSync(ctx context.Context, addr string, request *RemotingCommand) (*RemotingCommand, error)
+	InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, callback func(*ResponseFuture)) error
+	InvokeOneWay(ctx context.Context, addr string, request *RemotingCommand) error
 	ShutDown()
 }
 
@@ -69,12 +68,12 @@
 }
 
 // TODO: merge sync and async model. sync should run on async model by blocking on chan
-func (c *remotingClient) InvokeSync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error) {
+func (c *remotingClient) InvokeSync(ctx context.Context, addr string, request *RemotingCommand) (*RemotingCommand, error) {
 	conn, err := c.connect(ctx, addr)
 	if err != nil {
 		return nil, err
 	}
-	resp := NewResponseFuture(ctx, request.Opaque, timeout, nil)
+	resp := NewResponseFuture(ctx, request.Opaque, nil)
 	c.responseTable.Store(resp.Opaque, resp)
 	defer c.responseTable.Delete(request.Opaque)
 	err = c.sendRequest(conn, request)
@@ -86,12 +85,12 @@
 }
 
 // InvokeAsync send request without blocking, just return immediately.
-func (c *remotingClient) InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error {
+func (c *remotingClient) InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, callback func(*ResponseFuture)) error {
 	conn, err := c.connect(ctx, addr)
 	if err != nil {
 		return err
 	}
-	resp := NewResponseFuture(ctx, request.Opaque, timeout, callback)
+	resp := NewResponseFuture(ctx, request.Opaque, callback)
 	c.responseTable.Store(resp.Opaque, resp)
 	err = c.sendRequest(conn, request)
 	if err != nil {
@@ -109,7 +108,7 @@
 	}
 }
 
-func (c *remotingClient) InvokeOneWay(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) error {
+func (c *remotingClient) InvokeOneWay(ctx context.Context, addr string, request *RemotingCommand) error {
 	conn, err := c.connect(ctx, addr)
 	if err != nil {
 		return err
@@ -125,7 +124,6 @@
 	if ok {
 		return conn.(*tcpConnWrapper), nil
 	}
-
 	tcpConn, err := initConn(ctx, addr)
 	if err != nil {
 		return nil, err
diff --git a/internal/remote/remote_client_test.go b/internal/remote/remote_client_test.go
index 1d2c033..f3aa6a2 100644
--- a/internal/remote/remote_client_test.go
+++ b/internal/remote/remote_client_test.go
@@ -33,7 +33,7 @@
 )
 
 func TestNewResponseFuture(t *testing.T) {
-	future := NewResponseFuture(context.Background(), 10, time.Duration(1000), nil)
+	future := NewResponseFuture(context.Background(), 10, nil)
 	if future.Opaque != 10 {
 		t.Errorf("wrong ResponseFuture's opaque. want=%d, got=%d", 10, future.Opaque)
 	}
@@ -43,10 +43,6 @@
 	if future.Err != nil {
 		t.Errorf("wrong RespnseFuture's Err. want=<nil>, got=%v", future.Err)
 	}
-	if future.Timeout != time.Duration(1000) {
-		t.Errorf("wrong ResponseFuture's TimeoutMills. want=%d, got=%d",
-			future.Timeout, time.Duration(1000))
-	}
 	if future.callback != nil {
 		t.Errorf("wrong ResponseFuture's callback. want=<nil>, got!=<nil>")
 	}
@@ -63,7 +59,7 @@
 			r.ResponseCommand.Remark = r.ResponseCommand.Remark + "Go Client"
 		}
 	}
-	future := NewResponseFuture(context.Background(), 10, time.Duration(1000), callback)
+	future := NewResponseFuture(context.Background(), 10, callback)
 	future.ResponseCommand = NewRemotingCommand(200,
 		nil, nil)
 
@@ -83,22 +79,14 @@
 
 }
 
-func TestResponseFutureIsTimeout(t *testing.T) {
-	future := NewResponseFuture(context.Background(), 10, 500*time.Millisecond, nil)
-	if future.isTimeout() != false {
-		t.Errorf("wrong ResponseFuture's istimeout. want=%t, got=%t", false, future.isTimeout())
-	}
-	time.Sleep(time.Duration(1000) * time.Millisecond)
-	assert.True(t, future.isTimeout(), "ResponseFuture's istimeout should be true")
-}
-
 func TestResponseFutureWaitResponse(t *testing.T) {
-	future := NewResponseFuture(context.Background(), 10, 500*time.Millisecond, nil)
+	ctx, _ := context.WithTimeout(context.Background(), time.Duration(1000))
+	future := NewResponseFuture(ctx, 10, nil)
 	if _, err := future.waitResponse(); err != utils.ErrRequestTimeout {
 		t.Errorf("wrong ResponseFuture waitResponse. want=%v, got=%v",
 			utils.ErrRequestTimeout, err)
 	}
-	future = NewResponseFuture(context.Background(), 10, 500*time.Millisecond, nil)
+	future = NewResponseFuture(context.Background(), 10, nil)
 	responseError := errors.New("response error")
 	go func() {
 		time.Sleep(100 * time.Millisecond)
@@ -109,7 +97,7 @@
 		t.Errorf("wrong ResponseFuture waitResponse. want=%v. got=%v",
 			responseError, err)
 	}
-	future = NewResponseFuture(context.Background(), 10, 500*time.Millisecond, nil)
+	future = NewResponseFuture(context.Background(), 10, nil)
 	responseRemotingCommand := NewRemotingCommand(202, nil, nil)
 	go func() {
 		time.Sleep(100 * time.Millisecond)
@@ -175,7 +163,7 @@
 	go func() {
 		clientSend.Wait()
 		receiveCommand, err := client.InvokeSync(context.Background(), addr,
-			clientSendRemtingCommand, time.Second)
+			clientSendRemtingCommand)
 		if err != nil {
 			t.Fatalf("failed to invoke synchronous. %s", err)
 		} else {
@@ -238,7 +226,7 @@
 			time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
 			t.Logf("[Send: %d] asychronous message", index)
 			sendRemotingCommand := randomNewRemotingCommand()
-			err := client.InvokeAsync(context.Background(), addr, sendRemotingCommand, time.Second, func(r *ResponseFuture) {
+			err := client.InvokeAsync(context.Background(), addr, sendRemotingCommand, func(r *ResponseFuture) {
 				t.Logf("[Receive: %d] asychronous message response", index)
 				if string(sendRemotingCommand.Body) != string(r.ResponseCommand.Body) {
 					t.Errorf("wrong response message. want=%s, got=%s", string(sendRemotingCommand.Body),
@@ -304,8 +292,9 @@
 	clientSend.Add(1)
 	go func() {
 		clientSend.Wait()
-		err := client.InvokeAsync(context.Background(), addr, clientSendRemtingCommand,
-			time.Duration(1000), func(r *ResponseFuture) {
+		ctx, _ := context.WithTimeout(context.Background(), time.Duration(10*time.Second))
+		err := client.InvokeAsync(ctx, addr, clientSendRemtingCommand,
+			func(r *ResponseFuture) {
 				assert.NotNil(t, r.Err)
 				assert.Equal(t, utils.ErrRequestTimeout, r.Err)
 				wg.Done()
@@ -349,7 +338,7 @@
 	clientSend.Add(1)
 	go func() {
 		clientSend.Wait()
-		err := client.InvokeOneWay(context.Background(), addr, clientSendRemtingCommand, 3*time.Second)
+		err := client.InvokeOneWay(context.Background(), addr, clientSendRemtingCommand)
 		if err != nil {
 			t.Fatalf("failed to invoke synchronous. %s", err)
 		}
diff --git a/internal/route.go b/internal/route.go
index 128ac87..f4d4116 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -333,7 +333,8 @@
 	)
 	for i := 0; i < s.Size(); i++ {
 		rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
-		response, err = s.nameSrvClient.InvokeSync(context.Background(), s.getNameServerAddress(), rc, requestTimeout)
+		ctx, _ := context.WithTimeout(context.Background(), requestTimeout)
+		response, err = s.nameSrvClient.InvokeSync(ctx, s.getNameServerAddress(), rc)
 
 		if err == nil {
 			break
diff --git a/internal/route_test.go b/internal/route_test.go
index b24f312..4c5c22f 100644
--- a/internal/route_test.go
+++ b/internal/route_test.go
@@ -21,7 +21,6 @@
 	"context"
 	"sync"
 	"testing"
-	"time"
 
 	"github.com/golang/mock/gomock"
 	"github.com/pkg/errors"
@@ -50,8 +49,8 @@
 		Convey("When marshal producer trace data", func() {
 
 			count := 0
-			remotingCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
-				func(ctx context.Context, addr string, request *remote.RemotingCommand, timeout time.Duration) (*remote.RemotingCommand, error) {
+			remotingCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
+				func(ctx context.Context, addr string, request *remote.RemotingCommand) (*remote.RemotingCommand, error) {
 					count++
 					if count < 3 {
 						return nil, errors.New("not existed")
diff --git a/internal/trace.go b/internal/trace.go
index 00074ee..212ee60 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -404,7 +404,8 @@
 	}
 
 	var req = td.buildSendRequest(mq, msg)
-	err := td.cli.InvokeAsync(context.Background(), addr, req, 5*time.Second, func(command *remote.RemotingCommand, e error) {
+	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
+	err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) {
 		if e != nil {
 			rlog.Error("send trace data error", map[string]interface{}{
 				"traceData": data,
diff --git a/producer/producer.go b/producer/producer.go
index 05bcb15..f82104f 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -241,7 +241,8 @@
 		return errors.Errorf("topic=%s route info not found", mq.Topic)
 	}
 
-	return p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second, func(command *remote.RemotingCommand, err error) {
+	ctx, _ = context.WithTimeout(ctx, 3*time.Second)
+	return p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) {
 		resp := new(primitive.SendResult)
 		if err != nil {
 			h(ctx, nil, err)
diff --git a/producer/producer_test.go b/producer/producer_test.go
index f26fa25..387e9b0 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -20,7 +20,6 @@
 import (
 	"context"
 	"testing"
-	"time"
 
 	"github.com/golang/mock/gomock"
 	"github.com/stretchr/testify/assert"
@@ -190,9 +189,9 @@
 
 	mockB4Send(p)
 
-	client.EXPECT().InvokeAsync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
+	client.EXPECT().InvokeAsync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
 		func(ctx context.Context, addr string, request *remote.RemotingCommand,
-			timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error {
+			f func(*remote.RemotingCommand, error)) error {
 			// mock invoke callback
 			f(nil, nil)
 			return nil