blob: c5b9ba61b668b90b911879dd76f48edeb33901e3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package syncernotify
import (
"context"
"encoding/json"
"fmt"
"time"
pb "github.com/apache/servicecomb-service-center/pkg/dump"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/gorilla/websocket"
)
type WebSocket struct {
ctx context.Context
conn *websocket.Conn
err error
free chan struct{}
closed chan struct{}
}
func DoWebSocketWatch(ctx context.Context, conn *websocket.Conn) {
log.Debugf("begin do websocket watch")
socket := NewWebSocket(ctx, conn)
process(socket)
}
func NewWebSocket(ctx context.Context, conn *websocket.Conn) *WebSocket {
return &WebSocket{
ctx: ctx,
conn: conn,
}
}
func process(socket *WebSocket) {
socket.Init()
socket.HandleWatchWebSocketControlMessage()
socket.Stop()
}
func (wh *WebSocket) Init() {
wh.free = make(chan struct{}, 1)
wh.closed = make(chan struct{})
wh.SetReady()
remoteAddr := wh.conn.RemoteAddr().String()
Instance().Accept(wh)
log.Debugf("start watching instance status, watcher[%s]", remoteAddr)
}
func (wh *WebSocket) Pick() interface{} {
select {
case <-wh.Ready():
if wh.Err() != nil {
return wh.Err()
}
select {
case e := <-GetSyncerNotifyCenter().instEventCh:
return e
default:
// reset if idle
wh.SetReady()
}
default:
}
return nil
}
func (wh *WebSocket) HandleWatchWebSocketJob(o interface{}) {
defer wh.SetReady()
var (
message []byte
remoteAddr = wh.conn.RemoteAddr().String()
)
switch o := o.(type) {
// error will be set in HandleWatchWebSocketControlMessage
case error:
log.Errorf(o, "watcher[%s] catch an err,", remoteAddr)
message = util.StringToBytesWithNoCopy(fmt.Sprintf("watcher catch an err: %s", o.Error()))
// InstanceChangedEvent will be set in OnEvent
case *pb.WatchInstanceChangedEvent:
resp := o
log.Infof("event[%s] is coming in, watcher[%s]", resp.Action, remoteAddr)
data, err := json.Marshal(resp)
if err != nil {
log.Errorf(err, "watcher[%s] watch %s", remoteAddr)
message = util.StringToBytesWithNoCopy(fmt.Sprintf("marshal output file error, %s", err.Error()))
break
}
message = data
default:
log.Errorf(nil, "watcher[%s] unknown input %v", remoteAddr, o)
return
}
select {
case <-wh.closed:
return
default:
}
err := wh.WriteMessage(message)
if err != nil {
log.Errorf(err, "watcher[%s] catch an err", remoteAddr)
}
}
func (wh *WebSocket) ReadTimeout() time.Duration {
return ReadTimeout
}
func (wh *WebSocket) SendTimeout() time.Duration {
return SendTimeout
}
func (wh *WebSocket) Heartbeat(messageType int) error {
err := wh.conn.WriteControl(messageType, []byte{}, time.Now().Add(wh.SendTimeout()))
if err != nil {
messageTypeName := "Ping"
if messageType == websocket.PongMessage {
messageTypeName = "Pong"
}
log.Errorf(err, "fail to send '%s' to watcher[%s]", messageTypeName, wh.conn.RemoteAddr())
return err
}
return nil
}
func (wh *WebSocket) HandleWatchWebSocketControlMessage() {
remoteAddr := wh.conn.RemoteAddr().String()
// PING
wh.conn.SetPingHandler(func(message string) error {
defer func() {
err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
if err != nil {
log.Error("", err)
}
}()
log.Debugf("received 'Ping' message '%s' from watcher[%s]", message, remoteAddr)
return wh.Heartbeat(websocket.PongMessage)
})
// PONG
wh.conn.SetPongHandler(func(message string) error {
defer func() {
err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
if err != nil {
log.Error("", err)
}
}()
log.Debugf("received 'Pong' message '%s' from watcher[%s]", message, remoteAddr)
return nil
})
// CLOSE
wh.conn.SetCloseHandler(func(code int, text string) error {
log.Infof("watcher[%s] active closed, code: %d, message: '%s'", remoteAddr, code, text)
return wh.sendClose(code, text)
})
wh.conn.SetReadLimit(ReadMaxBody)
err := wh.conn.SetReadDeadline(time.Now().Add(wh.ReadTimeout()))
if err != nil {
log.Error("", err)
}
for {
_, _, err := wh.conn.ReadMessage()
if err != nil {
// client close or conn broken
wh.SetError(err)
return
}
}
}
func (wh *WebSocket) sendClose(code int, text string) error {
remoteAddr := wh.conn.RemoteAddr().String()
var message []byte
if code != websocket.CloseNoStatusReceived {
message = websocket.FormatCloseMessage(code, text)
}
err := wh.conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(wh.SendTimeout()))
if err != nil {
log.Errorf(err, "watcher[%s] catch an err", remoteAddr)
return err
}
return nil
}
func (wh *WebSocket) WriteMessage(message []byte) error {
err := wh.conn.SetWriteDeadline(time.Now().Add(wh.SendTimeout()))
if err != nil {
return err
}
return wh.conn.WriteMessage(websocket.TextMessage, message)
}
func (wh *WebSocket) Ready() <-chan struct{} {
return wh.free
}
func (wh *WebSocket) SetReady() {
select {
case wh.free <- struct{}{}:
default:
}
}
func (wh *WebSocket) Stop() {
close(wh.closed)
}
func (wh *WebSocket) SetError(e error) {
wh.err = e
}
func (wh *WebSocket) Err() error {
return wh.err
}