blob: 6c696dbffc9ca081a5ddb40da8c0f96efee1a22d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"errors"
"sync"
"time"
)
import (
getty "github.com/apache/dubbo-getty"
)
var (
errTooManySessions = errors.New("Too many echo sessions!")
msgHandler = &MessageHandler{}
echoMsgHandler = newEchoMessageHandler()
)
type PackageHandler interface {
Handle(getty.Session, *EchoPackage) error
}
////////////////////////////////////////////
// message handler
////////////////////////////////////////////
type MessageHandler struct{}
func (h *MessageHandler) Handle(session getty.Session, pkg *EchoPackage) error {
log.Debugf("get echo package{%s}", pkg)
// write echo message handle logic here.
_, _, err := session.WritePkg(pkg, conf.GettySessionParam.waitTimeout)
if err != nil {
log.Warnf("session.WritePkg(session{%s}, pkg{%s}) = error{%v}", session.Stat(), pkg, err)
session.Close()
}
return err
}
////////////////////////////////////////////
// EchoMessageHandler
////////////////////////////////////////////
type clientEchoSession struct {
session getty.Session
reqNum int32
}
type EchoMessageHandler struct {
handlers map[uint32]PackageHandler
rwlock sync.RWMutex
sessionMap map[getty.Session]*clientEchoSession
}
func newEchoMessageHandler() *EchoMessageHandler {
handlers := make(map[uint32]PackageHandler)
handlers[echoCmd] = msgHandler
return &EchoMessageHandler{sessionMap: make(map[getty.Session]*clientEchoSession), handlers: handlers}
}
func (h *EchoMessageHandler) OnOpen(session getty.Session) error {
var err error
h.rwlock.RLock()
if conf.SessionNumber <= len(h.sessionMap) {
err = errTooManySessions
}
h.rwlock.RUnlock()
if err != nil {
return err
}
log.Infof("got session:%s", session.Stat())
h.rwlock.Lock()
h.sessionMap[session] = &clientEchoSession{session: session}
h.rwlock.Unlock()
return nil
}
func (h *EchoMessageHandler) OnError(session getty.Session, err error) {
log.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err)
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
}
func (h *EchoMessageHandler) OnClose(session getty.Session) {
log.Infof("session{%s} is closing......", session.Stat())
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
}
func (h *EchoMessageHandler) OnMessage(session getty.Session, pkg interface{}) {
p, ok := pkg.(*EchoPackage)
if !ok {
log.Errorf("illegal packge{%#v}, %s", pkg, string(pkg.([]byte)))
return
}
handler, ok := h.handlers[p.H.Command]
if !ok {
log.Errorf("illegal command{%d}", p.H.Command)
return
}
err := handler.Handle(session, p)
if err != nil {
h.rwlock.Lock()
if _, ok := h.sessionMap[session]; ok {
h.sessionMap[session].reqNum++
}
h.rwlock.Unlock()
}
}
func (h *EchoMessageHandler) OnCron(session getty.Session) {
var (
flag bool
active time.Time
)
h.rwlock.RLock()
if _, ok := h.sessionMap[session]; ok {
active = session.GetActive()
if conf.sessionTimeout.Nanoseconds() < time.Since(active).Nanoseconds() {
flag = true
log.Warnf("session{%s} timeout{%s}, reqNum{%d}",
session.Stat(), time.Since(active).String(), h.sessionMap[session].reqNum)
}
}
h.rwlock.RUnlock()
if flag {
h.rwlock.Lock()
delete(h.sessionMap, session)
h.rwlock.Unlock()
session.Close()
}
}