[ISSUE #294] Resolve problem print error log when producer/consumer shutdown (#308)

* resolve issue-294
diff --git a/.travis.yml b/.travis.yml
index 9666607..e68fd31 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -19,9 +19,9 @@
 
 before_script:
   - cd ${TRAVIS_HOME}
-  - wget http://us.mirrors.quenda.co/apache/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip
-  - unzip rocketmq-all-4.3.2-bin-release.zip
-  - cd rocketmq-all-4.3.2-bin-release
+  - wget http://us.mirrors.quenda.co/apache/rocketmq/4.5.2/rocketmq-all-4.5.2-bin-release.zip
+  - unzip rocketmq-all-4.5.2-bin-release.zip
+  - cd rocketmq-all-4.5.2-bin-release
   - perl -i -pe's/-Xms8g -Xmx8g -Xmn4g/-Xms2g -Xmx2g -Xmn1g/g' bin/runbroker.sh
   - nohup sh bin/mqnamesrv &
   - nohup sh bin/mqbroker -n localhost:9876 &
diff --git a/go.mod b/go.mod
index 716c4d2..f715e21 100644
--- a/go.mod
+++ b/go.mod
@@ -10,6 +10,7 @@
 	github.com/tidwall/gjson v1.2.1
 	github.com/tidwall/match v1.0.1 // indirect
 	github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 // indirect
+	go.uber.org/atomic v1.5.1
 	stathat.com/c/consistent v1.0.0
 )
 
diff --git a/go.sum b/go.sum
index 9d64cc9..bdb5b0a 100644
--- a/go.sum
+++ b/go.sum
@@ -34,14 +34,23 @@
 github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
 github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 h1:rQ229MBgvW68s1/g6f1/63TgYwYxfF4E+bi/KC19P8g=
 github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
+go.uber.org/atomic v1.5.1 h1:rsqfU5vBkVknbhUGbAUwQKR2H4ItV8tjJ+6kJX4cxHM=
+go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
+golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
 golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
 golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
 golang.org/x/tools v0.0.0-20190425150028-36563e24a262 h1:qsl9y/CJx34tuA7QCPNp86JNJe4spst6Ff8MjvPUdPg=
 golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U=
+golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 7961896..26c3bea 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -117,16 +117,16 @@
 	return c.sendRequest(conn, request)
 }
 
-func (c *remotingClient) connect(ctx context.Context, addr string) (net.Conn, error) {
+func (c *remotingClient) connect(ctx context.Context, addr string) (*tcpConnWrapper, error) {
 	//it needs additional locker.
 	c.connectionLocker.Lock()
 	defer c.connectionLocker.Unlock()
 	conn, ok := c.connectionTable.Load(addr)
 	if ok {
-		return conn.(net.Conn), nil
+		return conn.(*tcpConnWrapper), nil
 	}
-	var d net.Dialer
-	tcpConn, err := d.DialContext(ctx, "tcp", addr)
+
+	tcpConn, err := initConn(ctx, addr)
 	if err != nil {
 		return nil, err
 	}
@@ -135,21 +135,27 @@
 	return tcpConn, nil
 }
 
-func (c *remotingClient) receiveResponse(r net.Conn) {
+func (c *remotingClient) receiveResponse(r *tcpConnWrapper) {
 	var err error
 	header := make([]byte, 4)
+	defer c.closeConnection(r)
 	for {
 		if err != nil {
+			if r.isClosed(err) {
+				return
+			}
 			rlog.Error("conn error, close connection", map[string]interface{}{
 				rlog.LogKeyUnderlayError: err,
 			})
-			c.closeConnection(r)
 			break
 		}
 
 		_, err = io.ReadFull(r, header)
 		if err != nil {
-			rlog.Error("io ReadFull error", map[string]interface{}{
+			if r.isClosed(err) {
+				return
+			}
+			rlog.Error("io ReadFull error when read header", map[string]interface{}{
 				rlog.LogKeyUnderlayError: err,
 			})
 			continue
@@ -167,7 +173,10 @@
 		buf := make([]byte, length)
 		_, err = io.ReadFull(r, buf)
 		if err != nil {
-			rlog.Error("io ReadFull error", map[string]interface{}{
+			if r.isClosed(err) {
+				return
+			}
+			rlog.Error("io ReadFull error when read payload", map[string]interface{}{
 				rlog.LogKeyUnderlayError: err,
 			})
 			continue
@@ -184,7 +193,7 @@
 	}
 }
 
-func (c *remotingClient) processCMD(cmd *RemotingCommand, r net.Conn) {
+func (c *remotingClient) processCMD(cmd *RemotingCommand, r *tcpConnWrapper) {
 	if cmd.isResponseType() {
 		resp, exist := c.responseTable.Load(cmd.Opaque)
 		if exist {
@@ -201,7 +210,9 @@
 	} else {
 		f := c.processors[cmd.Code]
 		if f != nil {
-			go func() { // 单个goroutine会造成死锁
+			// single goroutine will be deadlock
+			// TODO: optimize with goroutine pool, https://github.com/apache/rocketmq-client-go/issues/307
+			go func() {
 				res := f(cmd, r.RemoteAddr())
 				if res != nil {
 					res.Opaque = cmd.Opaque
@@ -257,7 +268,7 @@
 	return scanner
 }
 
-func (c *remotingClient) sendRequest(conn net.Conn, request *RemotingCommand) error {
+func (c *remotingClient) sendRequest(conn *tcpConnWrapper, request *RemotingCommand) error {
 	var err error
 	if c.interceptor != nil {
 		err = c.interceptor(context.Background(), request, nil, func(ctx context.Context, req, reply interface{}) error {
@@ -269,7 +280,7 @@
 	return err
 }
 
-func (c *remotingClient) doRequest(conn net.Conn, request *RemotingCommand) error {
+func (c *remotingClient) doRequest(conn *tcpConnWrapper, request *RemotingCommand) error {
 	content, err := encode(request)
 	if err != nil {
 		return err
@@ -282,7 +293,7 @@
 	return nil
 }
 
-func (c *remotingClient) closeConnection(toCloseConn net.Conn) {
+func (c *remotingClient) closeConnection(toCloseConn *tcpConnWrapper) {
 	c.connectionTable.Range(func(key, value interface{}) bool {
 		if value == toCloseConn {
 			c.connectionTable.Delete(key)
@@ -299,14 +310,22 @@
 		return true
 	})
 	c.connectionTable.Range(func(key, value interface{}) bool {
-		conn := value.(net.Conn)
-		conn.Close()
+		conn := value.(*tcpConnWrapper)
+		err := conn.destroy()
+		if err != nil {
+			rlog.Warning("close remoting conn error", map[string]interface{}{
+				"remote":                 conn.RemoteAddr(),
+				rlog.LogKeyUnderlayError: err,
+			})
+		} else {
+			rlog.Info("remoting conn closed", map[string]interface{}{
+				"remote": conn.RemoteAddr(),
+			})
+		}
 		return true
 	})
 }
 
 func (c *remotingClient) RegisterInterceptor(interceptors ...primitive.Interceptor) {
-
 	c.interceptor = primitive.ChainInterceptors(interceptors...)
-
 }
diff --git a/internal/remote/tcp_conn.go b/internal/remote/tcp_conn.go
new file mode 100644
index 0000000..0c82d52
--- /dev/null
+++ b/internal/remote/tcp_conn.go
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package remote
+
+import (
+	"context"
+	"net"
+
+	"go.uber.org/atomic"
+)
+
+// TODO: Adding TCP Connections Pool, https://github.com/apache/rocketmq-client-go/issues/298
+type tcpConnWrapper struct {
+	net.Conn
+	closed atomic.Bool
+}
+
+func initConn(ctx context.Context, addr string) (*tcpConnWrapper, error) {
+	var d net.Dialer
+	conn, err := d.DialContext(ctx, "tcp", addr)
+	if err != nil {
+		return nil, err
+	}
+	return &tcpConnWrapper{
+		Conn: conn,
+	}, nil
+}
+
+func (wrapper *tcpConnWrapper) destroy() error {
+	wrapper.closed.Swap(true)
+	return wrapper.Conn.Close()
+}
+
+func (wrapper *tcpConnWrapper) isClosed(err error) bool {
+	if !wrapper.closed.Load() {
+		return false
+	}
+
+	opErr, ok := err.(*net.OpError)
+	if !ok {
+		return false
+	}
+
+	return opErr.Err.Error() == "use of closed network connection"
+}