blob: c813f958b4fc207f7190b51c2ca62590c5f80be3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package pubsub
import (
"encoding/json"
"fmt"
"net"
"strconv"
"sync"
"time"
"github.com/apache/servicecomb-kie/server/config"
"github.com/go-chassis/openlog"
"github.com/hashicorp/serf/cmd/serf/command/agent"
"github.com/hashicorp/serf/serf"
)
var once sync.Once
var bus *Bus
// const
const (
EventKVChange = "kv-chg"
DefaultEventBatchSize = 5000
DefaultEventBatchInterval = 500 * time.Millisecond
)
var topics sync.Map
func Topics() *sync.Map {
return &topics
}
// Bus is message bug
type Bus struct {
agent *agent.Agent
}
// Init create serf agent
func Init() {
once.Do(func() {
ac := agent.DefaultConfig()
sc := serf.DefaultConfig()
scmc := sc.MemberlistConfig
listenPeerAddr := config.Configurations.ListenPeerAddr
if listenPeerAddr != "" {
ac.BindAddr = listenPeerAddr
scmc.BindAddr, scmc.BindPort = splitHostPort(listenPeerAddr, scmc.BindAddr, scmc.BindPort)
}
advertiseAddr := config.Configurations.AdvertiseAddr
if advertiseAddr != "" {
ac.AdvertiseAddr = advertiseAddr
scmc.AdvertiseAddr, scmc.AdvertisePort = splitHostPort(advertiseAddr, scmc.AdvertiseAddr, scmc.AdvertisePort)
}
if config.Configurations.NodeName != "" {
sc.NodeName = config.Configurations.NodeName
}
ac.UserEventSizeLimit = 512
a, err := agent.Create(ac, sc, nil)
if err != nil {
openlog.Fatal("can not sync key value change events to other kie nodes:" + err.Error())
}
bus = &Bus{
agent: a,
}
})
}
// splitHostPort split input string to host port
func splitHostPort(advertiseAddr string, defaultHost string, defaultPort int) (string, int) {
if len(advertiseAddr) == 0 {
return defaultHost, defaultPort
}
host, port, err := net.SplitHostPort(advertiseAddr)
if err != nil {
openlog.Fatal(fmt.Sprintf("split string[%s] to host:port failed", advertiseAddr))
}
p, err := strconv.Atoi(port)
if err != nil {
openlog.Fatal(fmt.Sprintf("invalid port in string[%s]", advertiseAddr))
}
return host, p
}
// Start start serf agent
func Start() {
err := bus.agent.Start()
if err != nil {
openlog.Fatal("can not sync key value change events to other kie nodes" + err.Error())
}
openlog.Info("kie message bus started")
eh := &ClusterEventHandler{}
bus.agent.RegisterEventHandler(eh)
if config.Configurations.PeerAddr != "" {
err := join([]string{config.Configurations.PeerAddr})
if err != nil {
openlog.Fatal("lost event message")
} else {
openlog.Info("join kie node:" + config.Configurations.PeerAddr)
}
}
}
func join(addresses []string) error {
_, err := bus.agent.Join(addresses, false)
if err != nil {
return err
}
return nil
}
// Publish send event
func Publish(event *KVChangeEvent) error {
b, err := json.Marshal(event)
if err != nil {
return err
}
return bus.agent.UserEvent(EventKVChange, b, true)
}
// AddObserver observe key changes by (key or labels) or (key and labels)
func AddObserver(o *Observer, topic *Topic) (string, error) {
t, err := topic.Encode()
if err != nil {
return "", err
}
observers, ok := topics.Load(t)
if !ok {
var observers = &sync.Map{}
observers.Store(o.UUID, o)
topics.Store(t, observers)
openlog.Info("new topic:" + t)
return t, nil
}
m := observers.(*sync.Map)
m.Store(o.UUID, o)
openlog.Debug("add new observer for topic:" + t)
return t, nil
}
func RemoveObserver(uuid string, topic *Topic) {
t, err := topic.Encode()
if err != nil {
openlog.Error(err.Error())
}
observers, _ := topics.Load(t)
m := observers.(*sync.Map)
m.Delete(uuid)
}