[ISSUE #294] Resolve problem print error log when producer/consumer shutdown (#308)
* resolve issue-294
diff --git a/.travis.yml b/.travis.yml
index 9666607..e68fd31 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -19,9 +19,9 @@
before_script:
- cd ${TRAVIS_HOME}
- - wget http://us.mirrors.quenda.co/apache/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip
- - unzip rocketmq-all-4.3.2-bin-release.zip
- - cd rocketmq-all-4.3.2-bin-release
+ - wget http://us.mirrors.quenda.co/apache/rocketmq/4.5.2/rocketmq-all-4.5.2-bin-release.zip
+ - unzip rocketmq-all-4.5.2-bin-release.zip
+ - cd rocketmq-all-4.5.2-bin-release
- perl -i -pe's/-Xms8g -Xmx8g -Xmn4g/-Xms2g -Xmx2g -Xmn1g/g' bin/runbroker.sh
- nohup sh bin/mqnamesrv &
- nohup sh bin/mqbroker -n localhost:9876 &
diff --git a/go.mod b/go.mod
index 716c4d2..f715e21 100644
--- a/go.mod
+++ b/go.mod
@@ -10,6 +10,7 @@
github.com/tidwall/gjson v1.2.1
github.com/tidwall/match v1.0.1 // indirect
github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 // indirect
+ go.uber.org/atomic v1.5.1
stathat.com/c/consistent v1.0.0
)
diff --git a/go.sum b/go.sum
index 9d64cc9..bdb5b0a 100644
--- a/go.sum
+++ b/go.sum
@@ -34,14 +34,23 @@
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 h1:rQ229MBgvW68s1/g6f1/63TgYwYxfF4E+bi/KC19P8g=
github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
+go.uber.org/atomic v1.5.1 h1:rsqfU5vBkVknbhUGbAUwQKR2H4ItV8tjJ+6kJX4cxHM=
+go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
+golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262 h1:qsl9y/CJx34tuA7QCPNp86JNJe4spst6Ff8MjvPUdPg=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U=
+golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 7961896..26c3bea 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -117,16 +117,16 @@
return c.sendRequest(conn, request)
}
-func (c *remotingClient) connect(ctx context.Context, addr string) (net.Conn, error) {
+func (c *remotingClient) connect(ctx context.Context, addr string) (*tcpConnWrapper, error) {
//it needs additional locker.
c.connectionLocker.Lock()
defer c.connectionLocker.Unlock()
conn, ok := c.connectionTable.Load(addr)
if ok {
- return conn.(net.Conn), nil
+ return conn.(*tcpConnWrapper), nil
}
- var d net.Dialer
- tcpConn, err := d.DialContext(ctx, "tcp", addr)
+
+ tcpConn, err := initConn(ctx, addr)
if err != nil {
return nil, err
}
@@ -135,21 +135,27 @@
return tcpConn, nil
}
-func (c *remotingClient) receiveResponse(r net.Conn) {
+func (c *remotingClient) receiveResponse(r *tcpConnWrapper) {
var err error
header := make([]byte, 4)
+ defer c.closeConnection(r)
for {
if err != nil {
+ if r.isClosed(err) {
+ return
+ }
rlog.Error("conn error, close connection", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
- c.closeConnection(r)
break
}
_, err = io.ReadFull(r, header)
if err != nil {
- rlog.Error("io ReadFull error", map[string]interface{}{
+ if r.isClosed(err) {
+ return
+ }
+ rlog.Error("io ReadFull error when read header", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
continue
@@ -167,7 +173,10 @@
buf := make([]byte, length)
_, err = io.ReadFull(r, buf)
if err != nil {
- rlog.Error("io ReadFull error", map[string]interface{}{
+ if r.isClosed(err) {
+ return
+ }
+ rlog.Error("io ReadFull error when read payload", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
continue
@@ -184,7 +193,7 @@
}
}
-func (c *remotingClient) processCMD(cmd *RemotingCommand, r net.Conn) {
+func (c *remotingClient) processCMD(cmd *RemotingCommand, r *tcpConnWrapper) {
if cmd.isResponseType() {
resp, exist := c.responseTable.Load(cmd.Opaque)
if exist {
@@ -201,7 +210,9 @@
} else {
f := c.processors[cmd.Code]
if f != nil {
- go func() { // 单个goroutine会造成死锁
+ // single goroutine will be deadlock
+ // TODO: optimize with goroutine pool, https://github.com/apache/rocketmq-client-go/issues/307
+ go func() {
res := f(cmd, r.RemoteAddr())
if res != nil {
res.Opaque = cmd.Opaque
@@ -257,7 +268,7 @@
return scanner
}
-func (c *remotingClient) sendRequest(conn net.Conn, request *RemotingCommand) error {
+func (c *remotingClient) sendRequest(conn *tcpConnWrapper, request *RemotingCommand) error {
var err error
if c.interceptor != nil {
err = c.interceptor(context.Background(), request, nil, func(ctx context.Context, req, reply interface{}) error {
@@ -269,7 +280,7 @@
return err
}
-func (c *remotingClient) doRequest(conn net.Conn, request *RemotingCommand) error {
+func (c *remotingClient) doRequest(conn *tcpConnWrapper, request *RemotingCommand) error {
content, err := encode(request)
if err != nil {
return err
@@ -282,7 +293,7 @@
return nil
}
-func (c *remotingClient) closeConnection(toCloseConn net.Conn) {
+func (c *remotingClient) closeConnection(toCloseConn *tcpConnWrapper) {
c.connectionTable.Range(func(key, value interface{}) bool {
if value == toCloseConn {
c.connectionTable.Delete(key)
@@ -299,14 +310,22 @@
return true
})
c.connectionTable.Range(func(key, value interface{}) bool {
- conn := value.(net.Conn)
- conn.Close()
+ conn := value.(*tcpConnWrapper)
+ err := conn.destroy()
+ if err != nil {
+ rlog.Warning("close remoting conn error", map[string]interface{}{
+ "remote": conn.RemoteAddr(),
+ rlog.LogKeyUnderlayError: err,
+ })
+ } else {
+ rlog.Info("remoting conn closed", map[string]interface{}{
+ "remote": conn.RemoteAddr(),
+ })
+ }
return true
})
}
func (c *remotingClient) RegisterInterceptor(interceptors ...primitive.Interceptor) {
-
c.interceptor = primitive.ChainInterceptors(interceptors...)
-
}
diff --git a/internal/remote/tcp_conn.go b/internal/remote/tcp_conn.go
new file mode 100644
index 0000000..0c82d52
--- /dev/null
+++ b/internal/remote/tcp_conn.go
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package remote
+
+import (
+ "context"
+ "net"
+
+ "go.uber.org/atomic"
+)
+
+// TODO: Adding TCP Connections Pool, https://github.com/apache/rocketmq-client-go/issues/298
+type tcpConnWrapper struct {
+ net.Conn
+ closed atomic.Bool
+}
+
+func initConn(ctx context.Context, addr string) (*tcpConnWrapper, error) {
+ var d net.Dialer
+ conn, err := d.DialContext(ctx, "tcp", addr)
+ if err != nil {
+ return nil, err
+ }
+ return &tcpConnWrapper{
+ Conn: conn,
+ }, nil
+}
+
+func (wrapper *tcpConnWrapper) destroy() error {
+ wrapper.closed.Swap(true)
+ return wrapper.Conn.Close()
+}
+
+func (wrapper *tcpConnWrapper) isClosed(err error) bool {
+ if !wrapper.closed.Load() {
+ return false
+ }
+
+ opErr, ok := err.(*net.OpError)
+ if !ok {
+ return false
+ }
+
+ return opErr.Err.Error() == "use of closed network connection"
+}