blob: 5bfa4da1042aec6ea8aec3080925e66119a750a4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package getty
import (
"bytes"
"context"
"fmt"
"io"
"net"
"runtime"
"sync"
"time"
)
import (
gxbytes "github.com/dubbogo/gost/bytes"
gxcontext "github.com/dubbogo/gost/context"
gxtime "github.com/dubbogo/gost/time"
"github.com/gorilla/websocket"
perrors "github.com/pkg/errors"
uatomic "go.uber.org/atomic"
)
const (
maxReadBufLen = 4 * 1024
netIOTimeout = 1e9 // 1s
period = 60 * 1e9 // 1 minute
pendingDuration = 3e9
// MaxWheelTimeSpan 900s, 15 minute
MaxWheelTimeSpan = 900e9
maxPacketLen = 16 * 1024
defaultSessionName = "session"
defaultTCPSessionName = "tcp-session"
defaultUDPSessionName = "udp-session"
defaultWSSessionName = "ws-session"
defaultWSSSessionName = "wss-session"
outputFormat = "session %s, Read Bytes: %d, Write Bytes: %d, Read Pkgs: %d, Write Pkgs: %d"
)
var defaultTimerWheel *gxtime.TimerWheel
func init() {
gxtime.InitDefaultTimerWheel()
defaultTimerWheel = gxtime.GetDefaultTimerWheel()
}
// Session wrap connection between the server and the client
type Session interface {
Connection
Reset()
Conn() net.Conn
Stat() string
IsClosed() bool
// EndPoint get endpoint type
EndPoint() EndPoint
SetMaxMsgLen(int)
SetName(string)
SetEventListener(EventListener)
SetPkgHandler(ReadWriter)
SetReader(Reader)
SetWriter(Writer)
SetCronPeriod(int)
SetWaitTime(time.Duration)
GetAttribute(interface{}) interface{}
SetAttribute(interface{}, interface{})
RemoveAttribute(interface{})
// WritePkg the Writer will invoke this function. Pls attention that if timeout is less than 0, WritePkg will send @pkg asap.
// for udp session, the first parameter should be UDPContext.
// totalBytesLength: @pkg stream bytes length after encoding @pkg.
// sendBytesLength: stream bytes length that sent out successfully.
// err: maybe it has illegal data, encoding error, or write out system error.
WritePkg(pkg interface{}, timeout time.Duration) (totalBytesLength int, sendBytesLength int, err error)
WriteBytes([]byte) (int, error)
WriteBytesArray(...[]byte) (int, error)
Close()
}
// getty base session
type session struct {
name string
endPoint EndPoint
// net read Write
Connection
listener EventListener
// codec
reader Reader // @reader should be nil when @conn is a gettyWSConn object.
writer Writer
// handle logic
maxMsgLen int32
// heartbeat
period time.Duration
// done
wait time.Duration
once *sync.Once
done chan struct{}
// attribute
attrs *gxcontext.ValuesContext
// goroutines sync
grNum uatomic.Int32
lock sync.RWMutex
packetLock sync.RWMutex
}
func newSession(endPoint EndPoint, conn Connection) *session {
ss := &session{
name: defaultSessionName,
endPoint: endPoint,
Connection: conn,
maxMsgLen: maxReadBufLen,
period: period,
once: &sync.Once{},
done: make(chan struct{}),
wait: pendingDuration,
attrs: gxcontext.NewValuesContext(context.Background()),
}
ss.Connection.setSession(ss)
ss.SetWriteTimeout(netIOTimeout)
ss.SetReadTimeout(netIOTimeout)
return ss
}
func newTCPSession(conn net.Conn, endPoint EndPoint) Session {
c := newGettyTCPConn(conn)
session := newSession(endPoint, c)
session.name = defaultTCPSessionName
return session
}
func newUDPSession(conn *net.UDPConn, endPoint EndPoint) Session {
c := newGettyUDPConn(conn)
session := newSession(endPoint, c)
session.name = defaultUDPSessionName
return session
}
func newWSSession(conn *websocket.Conn, endPoint EndPoint) Session {
c := newGettyWSConn(conn)
session := newSession(endPoint, c)
session.name = defaultWSSessionName
return session
}
func (s *session) Reset() {
*s = session{
name: defaultSessionName,
once: &sync.Once{},
done: make(chan struct{}),
period: period,
wait: pendingDuration,
attrs: gxcontext.NewValuesContext(context.Background()),
}
}
func (s *session) Conn() net.Conn {
if tc, ok := s.Connection.(*gettyTCPConn); ok {
return tc.conn
}
if uc, ok := s.Connection.(*gettyUDPConn); ok {
return uc.conn
}
if wc, ok := s.Connection.(*gettyWSConn); ok {
return wc.conn.UnderlyingConn()
}
return nil
}
func (s *session) EndPoint() EndPoint {
return s.endPoint
}
func (s *session) gettyConn() *gettyConn {
if tc, ok := s.Connection.(*gettyTCPConn); ok {
return &(tc.gettyConn)
}
if uc, ok := s.Connection.(*gettyUDPConn); ok {
return &(uc.gettyConn)
}
if wc, ok := s.Connection.(*gettyWSConn); ok {
return &(wc.gettyConn)
}
return nil
}
// Stat get the connect statistic data
func (s *session) Stat() string {
var conn *gettyConn
if conn = s.gettyConn(); conn == nil {
return ""
}
return fmt.Sprintf(
outputFormat,
s.sessionToken(),
conn.readBytes.Load(),
conn.writeBytes.Load(),
conn.readPkgNum.Load(),
conn.writePkgNum.Load(),
)
}
// IsClosed check whether the session has been closed.
func (s *session) IsClosed() bool {
select {
case <-s.done:
return true
default:
return false
}
}
// SetMaxMsgLen set maximum package length of every package in (EventListener)OnMessage(@pkgs)
func (s *session) SetMaxMsgLen(length int) {
s.lock.Lock()
defer s.lock.Unlock()
s.maxMsgLen = int32(length)
}
// SetName set session name
func (s *session) SetName(name string) {
s.lock.Lock()
defer s.lock.Unlock()
s.name = name
}
// SetEventListener set event listener
func (s *session) SetEventListener(listener EventListener) {
s.lock.Lock()
defer s.lock.Unlock()
s.listener = listener
}
// SetPkgHandler set package handler
func (s *session) SetPkgHandler(handler ReadWriter) {
s.lock.Lock()
defer s.lock.Unlock()
s.reader = handler
s.writer = handler
}
func (s *session) SetReader(reader Reader) {
s.lock.Lock()
defer s.lock.Unlock()
s.reader = reader
}
func (s *session) SetWriter(writer Writer) {
s.lock.Lock()
defer s.lock.Unlock()
s.writer = writer
}
// SetCronPeriod period is in millisecond. Websocket session will send ping frame automatically every peroid.
func (s *session) SetCronPeriod(period int) {
if period < 1 {
panic("@period < 1")
}
s.lock.Lock()
defer s.lock.Unlock()
s.period = time.Duration(period) * time.Millisecond
}
// SetWaitTime set maximum wait time when session got error or got exit signal
func (s *session) SetWaitTime(waitTime time.Duration) {
if waitTime < 1 {
panic("@wait < 1")
}
s.lock.Lock()
defer s.lock.Unlock()
s.wait = waitTime
}
// GetAttribute get attribute of key @session:key
func (s *session) GetAttribute(key interface{}) interface{} {
s.lock.RLock()
if s.attrs == nil {
s.lock.RUnlock()
return nil
}
ret, flag := s.attrs.Get(key)
s.lock.RUnlock()
if !flag {
return nil
}
return ret
}
// SetAttribute set attribute of key @session:key
func (s *session) SetAttribute(key interface{}, value interface{}) {
s.lock.Lock()
if s.attrs != nil {
s.attrs.Set(key, value)
}
s.lock.Unlock()
}
// RemoveAttribute remove attribute of key @session:key
func (s *session) RemoveAttribute(key interface{}) {
s.lock.Lock()
if s.attrs != nil {
s.attrs.Delete(key)
}
s.lock.Unlock()
}
func (s *session) sessionToken() string {
if s.IsClosed() || s.Connection == nil {
return "session-closed"
}
return fmt.Sprintf("{%s:%s:%d:%s<->%s}",
s.name, s.EndPoint().EndPointType(), s.ID(), s.LocalAddr(), s.RemoteAddr())
}
func (s *session) WritePkg(pkg interface{}, timeout time.Duration) (int, int, error) {
if pkg == nil {
return 0, 0, fmt.Errorf("@pkg is nil")
}
if s.IsClosed() {
return 0, 0, ErrSessionClosed
}
defer func() {
if r := recover(); r != nil {
const size = 64 << 10
rBuf := make([]byte, size)
rBuf = rBuf[:runtime.Stack(rBuf, false)]
log.Errorf("[session.WritePkg] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf)
}
}()
pkgBytes, err := s.writer.Write(s, pkg)
if err != nil {
log.Warnf("%s, [session.WritePkg] session.writer.Write(@pkg:%#v) = error:%+v", s.Stat(), pkg, err)
return len(pkgBytes), 0, perrors.WithStack(err)
}
var udpCtxPtr *UDPContext
if udpCtx, ok := pkg.(UDPContext); ok {
udpCtxPtr = &udpCtx
} else if udpCtxP, ok := pkg.(*UDPContext); ok {
udpCtxPtr = udpCtxP
}
if udpCtxPtr != nil {
udpCtxPtr.Pkg = pkgBytes
pkg = *udpCtxPtr
} else {
pkg = pkgBytes
}
s.packetLock.RLock()
defer s.packetLock.RUnlock()
if 0 < timeout {
s.Connection.SetWriteTimeout(timeout)
}
var succssCount int
succssCount, err = s.Connection.send(pkg)
if err != nil {
log.Warnf("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%+v", s.Stat(), pkg, err)
return len(pkgBytes), succssCount, perrors.WithStack(err)
}
return len(pkgBytes), succssCount, nil
}
// WriteBytes for codecs
func (s *session) WriteBytes(pkg []byte) (int, error) {
if s.IsClosed() {
return 0, ErrSessionClosed
}
leftPackageSize, totalSize, writeSize := len(pkg), len(pkg), 0
if leftPackageSize > maxPacketLen {
s.packetLock.Lock()
defer s.packetLock.Unlock()
} else {
s.packetLock.RLock()
defer s.packetLock.RUnlock()
}
for leftPackageSize > maxPacketLen {
_, err := s.Connection.send(pkg[writeSize:(writeSize + maxPacketLen)])
if err != nil {
return writeSize, perrors.Wrapf(err, "s.Connection.Write(pkg len:%d)", len(pkg))
}
leftPackageSize -= maxPacketLen
writeSize += maxPacketLen
}
if leftPackageSize == 0 {
return writeSize, nil
}
_, err := s.Connection.send(pkg[writeSize:])
if err != nil {
return writeSize, perrors.Wrapf(err, "s.Connection.Write(pkg len:%d)", len(pkg))
}
return totalSize, nil
}
// WriteBytesArray Write multiple packages at once. so we invoke write sys.call just one time.
func (s *session) WriteBytesArray(pkgs ...[]byte) (int, error) {
if s.IsClosed() {
return 0, ErrSessionClosed
}
if len(pkgs) == 1 {
return s.WriteBytes(pkgs[0])
}
// reduce syscall and memcopy for multiple packages
if _, ok := s.Connection.(*gettyTCPConn); ok {
s.packetLock.RLock()
defer s.packetLock.RUnlock()
lg, err := s.Connection.send(pkgs)
if err != nil {
return 0, perrors.Wrapf(err, "s.Connection.Write(pkgs num:%d)", len(pkgs))
}
return lg, nil
}
// get len
var (
l int
wlg int
err error
length int
arrp *[]byte
arr []byte
)
length = 0
for i := 0; i < len(pkgs); i++ {
length += len(pkgs[i])
}
// merge the pkgs
arrp = gxbytes.AcquireBytes(length)
defer gxbytes.ReleaseBytes(arrp)
arr = *arrp
l = 0
for i := 0; i < len(pkgs); i++ {
copy(arr[l:], pkgs[i])
l += len(pkgs[i])
}
wlg, err = s.WriteBytes(arr)
if err != nil {
return 0, perrors.WithStack(err)
}
num := len(pkgs) - 1
for i := 0; i < num; i++ {
s.incWritePkgNum()
}
return wlg, nil
}
func heartbeat(_ gxtime.TimerID, _ time.Time, arg interface{}) error {
ss, _ := arg.(*session)
if ss == nil || ss.IsClosed() {
return ErrSessionClosed
}
f := func() {
wsConn, wsFlag := ss.Connection.(*gettyWSConn)
if wsFlag {
err := wsConn.writePing()
if err != nil {
log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err))
}
}
ss.listener.OnCron(ss)
}
// if enable task pool, run @f asynchronously.
if taskPool := ss.EndPoint().GetTaskPool(); taskPool != nil {
taskPool.AddTaskAlways(f)
return nil
}
f()
return nil
}
// func (s *session) RunEventLoop() {
func (s *session) run() {
if s.Connection == nil || s.listener == nil || s.writer == nil {
errStr := fmt.Sprintf("session{name:%s, conn:%#v, listener:%#v, writer:%#v}",
s.name, s.Connection, s.listener, s.writer)
log.Error(errStr)
panic(errStr)
}
// call session opened
s.UpdateActive()
if err := s.listener.OnOpen(s); err != nil {
log.Errorf("[OnOpen] session %s, error: %#v", s.Stat(), err)
s.Close()
return
}
if _, err := defaultTimerWheel.AddTimer(heartbeat, gxtime.TimerLoop, s.period, s); err != nil {
panic(fmt.Sprintf("failed to add session %s to defaultTimerWheel err:%v", s.Stat(), err))
}
s.grNum.Add(1)
// start read gr
go s.handlePackage()
}
func (s *session) addTask(pkg interface{}) {
f := func() {
s.listener.OnMessage(s, pkg)
s.incReadPkgNum()
}
if taskPool := s.EndPoint().GetTaskPool(); taskPool != nil {
taskPool.AddTaskAlways(f)
return
}
f()
}
func (s *session) handlePackage() {
var err error
defer func() {
if r := recover(); r != nil {
const size = 64 << 10
rBuf := make([]byte, size)
rBuf = rBuf[:runtime.Stack(rBuf, false)]
log.Errorf("[session.handlePackage] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf)
}
grNum := s.grNum.Add(-1)
log.Infof("%s, [session.handlePackage] gr will exit now, left gr num %d", s.sessionToken(), grNum)
s.stop()
if err != nil {
log.Errorf("%s, [session.handlePackage] error:%+v", s.sessionToken(), perrors.WithStack(err))
if s != nil || s.listener != nil {
s.listener.OnError(s, err)
}
}
s.listener.OnClose(s)
s.gc()
}()
if _, ok := s.Connection.(*gettyTCPConn); ok {
if s.reader == nil {
errStr := fmt.Sprintf("session{name:%s, conn:%#v, reader:%#v}", s.name, s.Connection, s.reader)
log.Error(errStr)
panic(errStr)
}
err = s.handleTCPPackage()
} else if _, ok := s.Connection.(*gettyWSConn); ok {
err = s.handleWSPackage()
} else if _, ok := s.Connection.(*gettyUDPConn); ok {
err = s.handleUDPPackage()
} else {
panic(fmt.Sprintf("unknown type session{%#v}", s))
}
}
// get package from tcp stream(packet)
func (s *session) handleTCPPackage() error {
var (
ok bool
err error
netError net.Error
conn *gettyTCPConn
exit bool
bufLen int
pkgLen int
buf []byte
pktBuf *gxbytes.Buffer
pkg interface{}
)
pktBuf = gxbytes.NewBuffer(nil)
conn = s.Connection.(*gettyTCPConn)
for {
if s.IsClosed() {
err = nil
// do not handle the left stream in pktBuf and exit asap.
// it is impossible packing a package by the left stream.
break
}
bufLen = 0
for {
// for clause for the network timeout condition check
// s.conn.SetReadTimeout(time.Now().Add(s.rTimeout))
buf = pktBuf.WriteNextBegin(maxReadBufLen)
bufLen, err = conn.recv(buf)
if err != nil {
if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() {
break
}
if perrors.Cause(err) == io.EOF {
log.Infof("%s, session.conn read EOF, client send over, session exit", s.sessionToken())
err = nil
exit = true
if bufLen != 0 {
// as https://github.com/apache/dubbo-getty/issues/77#issuecomment-939652203
// this branch is impossible. Even if it happens, the bufLen will be zero and the error
// is io.EOF when getty continues to read the socket.
exit = false
log.Infof("%s, session.conn read EOF, while the bufLen(%d) is non-zero.", s.sessionToken())
}
break
}
log.Errorf("%s, [session.conn.read] = error:%+v", s.sessionToken(), perrors.WithStack(err))
exit = true
}
break
}
if 0 != bufLen {
pktBuf.WriteNextEnd(bufLen)
for {
if pktBuf.Len() <= 0 {
break
}
pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes())
// for case 3/case 4
if err == nil && s.maxMsgLen > 0 && pkgLen > int(s.maxMsgLen) {
err = perrors.Errorf("pkgLen %d > session max message len %d", pkgLen, s.maxMsgLen)
}
// handle case 1
if err != nil {
log.Warnf("%s, [session.handleTCPPackage] = len{%d}, error:%+v",
s.sessionToken(), pkgLen, perrors.WithStack(err))
exit = true
break
}
// handle case 2/case 3
if pkg == nil {
break
}
// handle case 4
s.UpdateActive()
s.addTask(pkg)
pktBuf.Next(pkgLen)
// continue to handle case 5
}
}
if exit {
break
}
}
return perrors.WithStack(err)
}
// get package from udp packet
func (s *session) handleUDPPackage() error {
var (
ok bool
err error
netError net.Error
conn *gettyUDPConn
bufLen int
maxBufLen int
bufp *[]byte
buf []byte
addr *net.UDPAddr
pkgLen int
pkg interface{}
)
conn = s.Connection.(*gettyUDPConn)
maxBufLen = int(s.maxMsgLen + maxReadBufLen)
if int(s.maxMsgLen<<1) < bufLen {
maxBufLen = int(s.maxMsgLen << 1)
}
bufp = gxbytes.AcquireBytes(maxBufLen)
defer gxbytes.ReleaseBytes(bufp)
buf = *bufp
for {
if s.IsClosed() {
break
}
bufLen, addr, err = conn.recv(buf)
log.Debugf("conn.read() = bufLen:%d, addr:%#v, err:%+v", bufLen, addr, perrors.WithStack(err))
if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() {
continue
}
if err != nil {
log.Errorf("%s, [session.handleUDPPackage] = len:%d, error:%+v",
s.sessionToken(), bufLen, perrors.WithStack(err))
err = perrors.Wrapf(err, "conn.read()")
break
}
if bufLen == 0 {
log.Errorf("conn.read() = bufLen:%d, addr:%s, err:%+v", bufLen, addr, perrors.WithStack(err))
continue
}
if bufLen == len(connectPingPackage) && bytes.Equal(connectPingPackage, buf[:bufLen]) {
log.Infof("got %s connectPingPackage", addr)
continue
}
pkg, pkgLen, err = s.reader.Read(s, buf[:bufLen])
log.Debugf("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v", pkg, pkgLen, perrors.WithStack(err))
if err == nil && s.maxMsgLen > 0 && bufLen > int(s.maxMsgLen) {
err = perrors.Errorf("Message Too Long, bufLen %d, session max message len %d", bufLen, s.maxMsgLen)
}
if err != nil {
log.Warnf("%s, [session.handleUDPPackage] = len:%d, error:%+v",
s.sessionToken(), pkgLen, perrors.WithStack(err))
continue
}
if pkgLen == 0 {
log.Errorf("s.reader.Read() = pkg:%#v, pkgLen:%d, err:%+v", pkg, pkgLen, perrors.WithStack(err))
continue
}
s.UpdateActive()
s.addTask(UDPContext{Pkg: pkg, PeerAddr: addr})
}
return perrors.WithStack(err)
}
// get package from websocket stream
func (s *session) handleWSPackage() error {
var (
ok bool
err error
netError net.Error
length int
conn *gettyWSConn
pkg []byte
unmarshalPkg interface{}
)
conn = s.Connection.(*gettyWSConn)
for {
if s.IsClosed() {
break
}
pkg, err = conn.recv()
if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() {
continue
}
if err != nil {
log.Warnf("%s, [session.handleWSPackage] = error:%+v",
s.sessionToken(), perrors.WithStack(err))
return perrors.WithStack(err)
}
s.UpdateActive()
if s.reader != nil {
unmarshalPkg, length, err = s.reader.Read(s, pkg)
if err == nil && s.maxMsgLen > 0 && length > int(s.maxMsgLen) {
err = perrors.Errorf("Message Too Long, length %d, session max message len %d", length, s.maxMsgLen)
}
if err != nil {
log.Warnf("%s, [session.handleWSPackage] = len:%d, error:%+v",
s.sessionToken(), length, perrors.WithStack(err))
continue
}
s.addTask(unmarshalPkg)
} else {
s.addTask(pkg)
}
}
return nil
}
func (s *session) stop() {
select {
case <-s.done: // s.done is a blocked channel. if it has not been closed, the default branch will be invoked.
return
default:
s.once.Do(func() {
// let read/Write timeout asap
now := time.Now()
if conn := s.Conn(); conn != nil {
conn.SetReadDeadline(now.Add(s.readTimeout()))
conn.SetWriteDeadline(now.Add(s.writeTimeout()))
}
close(s.done)
c := s.GetAttribute(sessionClientKey)
if clt, ok := c.(*client); ok {
clt.reConnect()
}
})
}
}
func (s *session) gc() {
var conn Connection
s.lock.Lock()
if s.attrs != nil {
s.attrs = nil
conn = s.Connection
s.Connection = nil
}
s.lock.Unlock()
go func() {
if conn != nil {
conn.close(int(s.wait))
}
}()
}
// Close will be invoked by NewSessionCallback(if return error is not nil)
// or (session)handleLoop automatically. It's thread safe.
func (s *session) Close() {
s.stop()
log.Infof("%s closed now. its current gr num is %d", s.sessionToken(), s.grNum.Load())
}
// GetActive return connection's time
func (s *session) GetActive() time.Time {
if s == nil {
return launchTime
}
s.lock.RLock()
defer s.lock.RUnlock()
if s.Connection != nil {
return s.Connection.GetActive()
}
return launchTime
}
// UpdateActive update connection's active time
func (s *session) UpdateActive() {
if s == nil {
return
}
s.lock.RLock()
defer s.lock.RUnlock()
if s.Connection != nil {
s.Connection.UpdateActive()
}
}
func (s *session) ID() uint32 {
if s == nil {
return 0
}
s.lock.RLock()
defer s.lock.RUnlock()
if s.Connection != nil {
return s.Connection.ID()
}
return 0
}
func (s *session) LocalAddr() string {
if s == nil {
return ""
}
s.lock.RLock()
defer s.lock.RUnlock()
if s.Connection != nil {
return s.Connection.LocalAddr()
}
return ""
}
func (s *session) RemoteAddr() string {
if s == nil {
return ""
}
s.lock.RLock()
defer s.lock.RUnlock()
if s.Connection != nil {
return s.Connection.RemoteAddr()
}
return ""
}
func (s *session) incReadPkgNum() {
if s == nil {
return
}
s.lock.RLock()
defer s.lock.RUnlock()
if s.Connection != nil {
s.Connection.incReadPkgNum()
}
}
func (s *session) incWritePkgNum() {
if s == nil {
return
}
s.lock.RLock()
defer s.lock.RUnlock()
if s.Connection != nil {
s.Connection.incWritePkgNum()
}
}
func (s *session) send(pkg interface{}) (int, error) {
if s == nil {
return 0, nil
}
s.lock.RLock()
defer s.lock.RUnlock()
if s.Connection != nil {
return s.Connection.send(pkg)
}
return 0, nil
}
func (s *session) readTimeout() time.Duration {
if s == nil {
return time.Duration(0)
}
s.lock.RLock()
defer s.lock.RUnlock()
if s.Connection != nil {
return s.Connection.readTimeout()
}
return time.Duration(0)
}
func (s *session) setSession(ss Session) {
if s == nil {
return
}
s.lock.RLock()
if s.Connection != nil {
s.Connection.setSession(ss)
}
s.lock.RUnlock()
}