Merge pull request #36 from zaihang365/33_using_writev
Impl: reduce syscall and memcopy for multiple package
diff --git a/client_test.go b/client_test.go
index 1888cda..5ca93f2 100644
--- a/client_test.go
+++ b/client_test.go
@@ -7,6 +7,7 @@
"os"
"strconv"
"sync"
+ "sync/atomic"
"testing"
"time"
)
@@ -122,7 +123,21 @@
ss.SetCompressType(CompressNone)
conn := ss.(*session).Connection.(*gettyTCPConn)
assert.True(t, conn.compress == CompressNone)
+ beforeWriteBytes := atomic.LoadUint32(&conn.writeBytes)
+ beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
+ _, err = conn.send([]byte("hello"))
+ assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
+ assert.Nil(t, err)
+ assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&conn.writeBytes))
err = ss.WriteBytes([]byte("hello"))
+ assert.Equal(t, beforeWriteBytes+10, atomic.LoadUint32(&conn.writeBytes))
+ assert.Equal(t, beforeWritePkgNum+2, atomic.LoadUint32(&conn.writePkgNum))
+ assert.Nil(t, err)
+ var pkgs [][]byte
+ pkgs = append(pkgs, []byte("hello"), []byte("hello"))
+ _, err = conn.send(pkgs)
+ assert.Equal(t, beforeWritePkgNum+4, atomic.LoadUint32(&conn.writePkgNum))
+ assert.Equal(t, beforeWriteBytes+20, atomic.LoadUint32(&conn.writeBytes))
assert.Nil(t, err)
ss.SetCompressType(CompressSnappy)
assert.True(t, conn.compress == CompressSnappy)
@@ -194,7 +209,14 @@
_, err = udpConn.send(udpCtx)
assert.NotNil(t, err)
udpCtx.Pkg = []byte("hello")
+ beforeWriteBytes := atomic.LoadUint32(&udpConn.writeBytes)
_, err = udpConn.send(udpCtx)
+ assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&udpConn.writeBytes))
+ assert.Nil(t, err)
+
+ beforeWritePkgNum := atomic.LoadUint32(&udpConn.writePkgNum)
+ err = ss.WritePkg(udpCtx, 0)
+ assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&udpConn.writePkgNum))
assert.Nil(t, err)
clt.Close()
@@ -247,8 +269,13 @@
assert.Nil(t, err)
_, err = conn.send("hello")
assert.NotNil(t, err)
+ beforeWriteBytes := atomic.LoadUint32(&conn.writeBytes)
_, err = conn.send([]byte("hello"))
assert.Nil(t, err)
+ assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&conn.writeBytes))
+ beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
+ err = ss.WriteBytes([]byte("hello"))
+ assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
err = conn.writePing()
assert.Nil(t, err)
diff --git a/connection.go b/connection.go
index cdadd51..d0fe626 100644
--- a/connection.go
+++ b/connection.go
@@ -265,9 +265,6 @@
length int
)
- if p, ok = pkg.([]byte); !ok {
- return 0, perrors.Errorf("illegal @pkg{%#v} type", pkg)
- }
if t.compress == CompressNone && t.wTimeout > 0 {
// Optimization: update write deadline only if more than 25%
// of the last write deadline exceeded.
@@ -281,12 +278,28 @@
}
}
- if length, err = t.writer.Write(p); err == nil {
- atomic.AddUint32(&t.writeBytes, (uint32)(len(p)))
+ if buffers, ok := pkg.([][]byte); ok {
+ netBuf := net.Buffers(buffers)
+ if length, err := netBuf.WriteTo(t.conn); err == nil {
+ atomic.AddUint32(&t.writeBytes, (uint32)(length))
+ atomic.AddUint32(&t.writePkgNum, (uint32)(len(buffers)))
+ }
+ log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s",
+ t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
+ return int(length), perrors.WithStack(err)
}
- log.Debugf("now:%s, length:%d, err:%v", currentTime, length, err)
- return length, perrors.WithStack(err)
- //return length, err
+
+ if p, ok = pkg.([]byte); ok {
+ if length, err = t.writer.Write(p); err == nil {
+ atomic.AddUint32(&t.writeBytes, (uint32)(len(p)))
+ atomic.AddUint32(&t.writePkgNum, 1)
+ }
+ log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s",
+ t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
+ return length, perrors.WithStack(err)
+ }
+
+ return 0, perrors.Errorf("illegal @pkg{%#v} type", pkg)
}
// close tcp connection
@@ -437,11 +450,11 @@
if length, _, err = u.conn.WriteMsgUDP(buf, nil, peerAddr); err == nil {
atomic.AddUint32(&u.writeBytes, (uint32)(len(buf)))
+ atomic.AddUint32(&u.writePkgNum, 1)
}
log.Debugf("WriteMsgUDP(peerAddr:%s) = {length:%d, error:%v}", peerAddr, length, err)
return length, perrors.WithStack(err)
- //return length, err
}
// close udp connection
@@ -531,7 +544,7 @@
// gorilla/websocket/conn.go:NextReader will always fail when got a timeout error.
_, b, e := w.conn.ReadMessage() // the first return value is message type.
if e == nil {
- w.incReadPkgNum()
+ atomic.AddUint32(&w.readBytes, (uint32)(len(b)))
} else {
if websocket.IsUnexpectedCloseError(e, websocket.CloseGoingAway) {
log.Warnf("websocket unexpected close error: %v", e)
@@ -579,9 +592,9 @@
w.updateWriteDeadline()
if err = w.conn.WriteMessage(websocket.BinaryMessage, p); err == nil {
atomic.AddUint32(&w.writeBytes, (uint32)(len(p)))
+ atomic.AddUint32(&w.writePkgNum, 1)
}
return len(p), perrors.WithStack(err)
- //return len(p), err
}
func (w *gettyWSConn) writePing() error {
diff --git a/session.go b/session.go
index 5d31fa0..5e9dc3f 100644
--- a/session.go
+++ b/session.go
@@ -301,7 +301,7 @@
s.lock.Lock()
defer s.lock.Unlock()
s.wQ = make(chan interface{}, writeQLen)
- log.Debug("%s, [session.SetWQLen] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ))
+ log.Debugf("%s, [session.SetWQLen] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ))
}
// set maximum wait time when session got error or got exit signal
@@ -404,10 +404,9 @@
}
_, err = s.Connection.send(pkg)
if err != nil {
- log.Warn("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%v", s.Stat(), pkg, err)
+ log.Warnf("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%v", s.Stat(), pkg, err)
return perrors.WithStack(err)
}
- s.incWritePkgNum()
return nil
}
select {
@@ -432,9 +431,6 @@
if _, err := s.Connection.send(pkg); err != nil {
return perrors.Wrapf(err, "s.Connection.Write(pkg len:%d)", len(pkg))
}
-
- s.incWritePkgNum()
-
return nil
}
@@ -449,39 +445,10 @@
return s.WriteBytes(pkgs[0])
}
- // get len
- var (
- l int
- err error
- length int
- arrp *[]byte
- arr []byte
- )
- length = 0
- for i := 0; i < len(pkgs); i++ {
- length += len(pkgs[i])
+ // TODO Currently, only TCP is supported.
+ if _, err := s.Connection.send(pkgs); err != nil {
+ return perrors.Wrapf(err, "s.Connection.Write(pkgs num:%d)", len(pkgs))
}
-
- // merge the pkgs
- // arr = make([]byte, length)
- arrp = gxbytes.GetBytes(length)
- defer gxbytes.PutBytes(arrp)
- arr = *arrp
- l = 0
- for i := 0; i < len(pkgs); i++ {
- copy(arr[l:], pkgs[i])
- l += len(pkgs[i])
- }
-
- if err = s.WriteBytes(arr); err != nil {
- return perrors.WithStack(err)
- }
-
- num := len(pkgs) - 1
- for i := 0; i < num; i++ {
- s.incWritePkgNum()
- }
-
return nil
}
@@ -567,7 +534,7 @@
continue
}
if !flag {
- log.Warn("[session.handleLoop] drop write out package %#v", outPkg)
+ log.Warnf("[session.handleLoop] drop write out package %#v", outPkg)
continue
}
@@ -958,6 +925,6 @@
// or (session)handleLoop automatically. It's thread safe.
func (s *session) Close() {
s.stop()
- log.Info("%s closed now. its current gr num is %d",
+ log.Infof("%s closed now. its current gr num is %d",
s.sessionToken(), atomic.LoadInt32(&(s.grNum)))
}