Merge pull request #36 from zaihang365/33_using_writev

Impl: reduce syscall and memcopy for multiple package
diff --git a/client_test.go b/client_test.go
index 1888cda..5ca93f2 100644
--- a/client_test.go
+++ b/client_test.go
@@ -7,6 +7,7 @@
 	"os"
 	"strconv"
 	"sync"
+	"sync/atomic"
 	"testing"
 	"time"
 )
@@ -122,7 +123,21 @@
 	ss.SetCompressType(CompressNone)
 	conn := ss.(*session).Connection.(*gettyTCPConn)
 	assert.True(t, conn.compress == CompressNone)
+	beforeWriteBytes := atomic.LoadUint32(&conn.writeBytes)
+	beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
+	_, err = conn.send([]byte("hello"))
+	assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
+	assert.Nil(t, err)
+	assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&conn.writeBytes))
 	err = ss.WriteBytes([]byte("hello"))
+	assert.Equal(t, beforeWriteBytes+10, atomic.LoadUint32(&conn.writeBytes))
+	assert.Equal(t, beforeWritePkgNum+2, atomic.LoadUint32(&conn.writePkgNum))
+	assert.Nil(t, err)
+	var pkgs [][]byte
+	pkgs = append(pkgs, []byte("hello"), []byte("hello"))
+	_, err = conn.send(pkgs)
+	assert.Equal(t, beforeWritePkgNum+4, atomic.LoadUint32(&conn.writePkgNum))
+	assert.Equal(t, beforeWriteBytes+20, atomic.LoadUint32(&conn.writeBytes))
 	assert.Nil(t, err)
 	ss.SetCompressType(CompressSnappy)
 	assert.True(t, conn.compress == CompressSnappy)
@@ -194,7 +209,14 @@
 	_, err = udpConn.send(udpCtx)
 	assert.NotNil(t, err)
 	udpCtx.Pkg = []byte("hello")
+	beforeWriteBytes := atomic.LoadUint32(&udpConn.writeBytes)
 	_, err = udpConn.send(udpCtx)
+	assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&udpConn.writeBytes))
+	assert.Nil(t, err)
+
+	beforeWritePkgNum := atomic.LoadUint32(&udpConn.writePkgNum)
+	err = ss.WritePkg(udpCtx, 0)
+	assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&udpConn.writePkgNum))
 	assert.Nil(t, err)
 
 	clt.Close()
@@ -247,8 +269,13 @@
 	assert.Nil(t, err)
 	_, err = conn.send("hello")
 	assert.NotNil(t, err)
+	beforeWriteBytes := atomic.LoadUint32(&conn.writeBytes)
 	_, err = conn.send([]byte("hello"))
 	assert.Nil(t, err)
+	assert.Equal(t, beforeWriteBytes+5, atomic.LoadUint32(&conn.writeBytes))
+	beforeWritePkgNum := atomic.LoadUint32(&conn.writePkgNum)
+	err = ss.WriteBytes([]byte("hello"))
+	assert.Equal(t, beforeWritePkgNum+1, atomic.LoadUint32(&conn.writePkgNum))
 	err = conn.writePing()
 	assert.Nil(t, err)
 
diff --git a/connection.go b/connection.go
index cdadd51..d0fe626 100644
--- a/connection.go
+++ b/connection.go
@@ -265,9 +265,6 @@
 		length      int
 	)
 
-	if p, ok = pkg.([]byte); !ok {
-		return 0, perrors.Errorf("illegal @pkg{%#v} type", pkg)
-	}
 	if t.compress == CompressNone && t.wTimeout > 0 {
 		// Optimization: update write deadline only if more than 25%
 		// of the last write deadline exceeded.
@@ -281,12 +278,28 @@
 		}
 	}
 
-	if length, err = t.writer.Write(p); err == nil {
-		atomic.AddUint32(&t.writeBytes, (uint32)(len(p)))
+	if buffers, ok := pkg.([][]byte); ok {
+		netBuf := net.Buffers(buffers)
+		if length, err := netBuf.WriteTo(t.conn); err == nil {
+			atomic.AddUint32(&t.writeBytes, (uint32)(length))
+			atomic.AddUint32(&t.writePkgNum, (uint32)(len(buffers)))
+		}
+		log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s",
+			t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
+		return int(length), perrors.WithStack(err)
 	}
-	log.Debugf("now:%s, length:%d, err:%v", currentTime, length, err)
-	return length, perrors.WithStack(err)
-	//return length, err
+
+	if p, ok = pkg.([]byte); ok {
+		if length, err = t.writer.Write(p); err == nil {
+			atomic.AddUint32(&t.writeBytes, (uint32)(len(p)))
+			atomic.AddUint32(&t.writePkgNum, 1)
+		}
+		log.Debugf("localAddr: %s, remoteAddr:%s, now:%s, length:%d, err:%s",
+			t.conn.LocalAddr(), t.conn.RemoteAddr(), currentTime, length, err)
+		return length, perrors.WithStack(err)
+	}
+
+	return 0, perrors.Errorf("illegal @pkg{%#v} type", pkg)
 }
 
 // close tcp connection
@@ -437,11 +450,11 @@
 
 	if length, _, err = u.conn.WriteMsgUDP(buf, nil, peerAddr); err == nil {
 		atomic.AddUint32(&u.writeBytes, (uint32)(len(buf)))
+		atomic.AddUint32(&u.writePkgNum, 1)
 	}
 	log.Debugf("WriteMsgUDP(peerAddr:%s) = {length:%d, error:%v}", peerAddr, length, err)
 
 	return length, perrors.WithStack(err)
-	//return length, err
 }
 
 // close udp connection
@@ -531,7 +544,7 @@
 	// gorilla/websocket/conn.go:NextReader will always fail when got a timeout error.
 	_, b, e := w.conn.ReadMessage() // the first return value is message type.
 	if e == nil {
-		w.incReadPkgNum()
+		atomic.AddUint32(&w.readBytes, (uint32)(len(b)))
 	} else {
 		if websocket.IsUnexpectedCloseError(e, websocket.CloseGoingAway) {
 			log.Warnf("websocket unexpected close error: %v", e)
@@ -579,9 +592,9 @@
 	w.updateWriteDeadline()
 	if err = w.conn.WriteMessage(websocket.BinaryMessage, p); err == nil {
 		atomic.AddUint32(&w.writeBytes, (uint32)(len(p)))
+		atomic.AddUint32(&w.writePkgNum, 1)
 	}
 	return len(p), perrors.WithStack(err)
-	//return len(p), err
 }
 
 func (w *gettyWSConn) writePing() error {
diff --git a/session.go b/session.go
index 5d31fa0..5e9dc3f 100644
--- a/session.go
+++ b/session.go
@@ -301,7 +301,7 @@
 	s.lock.Lock()
 	defer s.lock.Unlock()
 	s.wQ = make(chan interface{}, writeQLen)
-	log.Debug("%s, [session.SetWQLen] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ))
+	log.Debugf("%s, [session.SetWQLen] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ))
 }
 
 // set maximum wait time when session got error or got exit signal
@@ -404,10 +404,9 @@
 		}
 		_, err = s.Connection.send(pkg)
 		if err != nil {
-			log.Warn("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%v", s.Stat(), pkg, err)
+			log.Warnf("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%v", s.Stat(), pkg, err)
 			return perrors.WithStack(err)
 		}
-		s.incWritePkgNum()
 		return nil
 	}
 	select {
@@ -432,9 +431,6 @@
 	if _, err := s.Connection.send(pkg); err != nil {
 		return perrors.Wrapf(err, "s.Connection.Write(pkg len:%d)", len(pkg))
 	}
-
-	s.incWritePkgNum()
-
 	return nil
 }
 
@@ -449,39 +445,10 @@
 		return s.WriteBytes(pkgs[0])
 	}
 
-	// get len
-	var (
-		l      int
-		err    error
-		length int
-		arrp   *[]byte
-		arr    []byte
-	)
-	length = 0
-	for i := 0; i < len(pkgs); i++ {
-		length += len(pkgs[i])
+	// TODO Currently, only TCP is supported.
+	if _, err := s.Connection.send(pkgs); err != nil {
+		return perrors.Wrapf(err, "s.Connection.Write(pkgs num:%d)", len(pkgs))
 	}
-
-	// merge the pkgs
-	// arr = make([]byte, length)
-	arrp = gxbytes.GetBytes(length)
-	defer gxbytes.PutBytes(arrp)
-	arr = *arrp
-	l = 0
-	for i := 0; i < len(pkgs); i++ {
-		copy(arr[l:], pkgs[i])
-		l += len(pkgs[i])
-	}
-
-	if err = s.WriteBytes(arr); err != nil {
-		return perrors.WithStack(err)
-	}
-
-	num := len(pkgs) - 1
-	for i := 0; i < num; i++ {
-		s.incWritePkgNum()
-	}
-
 	return nil
 }
 
@@ -567,7 +534,7 @@
 				continue
 			}
 			if !flag {
-				log.Warn("[session.handleLoop] drop write out package %#v", outPkg)
+				log.Warnf("[session.handleLoop] drop write out package %#v", outPkg)
 				continue
 			}
 
@@ -958,6 +925,6 @@
 // or (session)handleLoop automatically. It's thread safe.
 func (s *session) Close() {
 	s.stop()
-	log.Info("%s closed now. its current gr num is %d",
+	log.Infof("%s closed now. its current gr num is %d",
 		s.sessionToken(), atomic.LoadInt32(&(s.grNum)))
 }