blob: a67ca8fb34d9e1ab0d9db435a7b44b8e618a97ad [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package pubsub
import (
"encoding/json"
"github.com/apache/servicecomb-kie/server/config"
"github.com/go-mesh/openlogging"
"github.com/hashicorp/serf/cmd/serf/command/agent"
"github.com/hashicorp/serf/serf"
"sync"
)
var once sync.Once
var bus *Bus
//const
const (
EventKVChange = "kv-changed"
)
var mutexObservers sync.RWMutex
var topics sync.Map
//Bus is message bug
type Bus struct {
agent *agent.Agent
}
//Init create serf agent
func Init() {
once.Do(func() {
ac := agent.DefaultConfig()
if config.Configurations.ListenPeerAddr != "" {
ac.BindAddr = config.Configurations.ListenPeerAddr
}
if config.Configurations.AdvertiseAddr != "" {
ac.AdvertiseAddr = config.Configurations.AdvertiseAddr
}
sc := serf.DefaultConfig()
if config.Configurations.NodeName != "" {
sc.NodeName = config.Configurations.NodeName
}
ac.UserEventSizeLimit = 512
a, err := agent.Create(ac, sc, nil)
if err != nil {
openlogging.Fatal("can not sync key value change events to other kie nodes:" + err.Error())
}
bus = &Bus{
agent: a,
}
if config.Configurations.PeerAddr != "" {
err := join([]string{config.Configurations.PeerAddr})
if err != nil {
openlogging.Fatal("lost event message")
} else {
openlogging.Info("join kie node:" + config.Configurations.PeerAddr)
}
}
})
}
//Start start serf agent
func Start() {
err := bus.agent.Start()
if err != nil {
openlogging.Fatal("can not sync key value change events to other kie nodes" + err.Error())
}
openlogging.Info("kie message bus started")
bus.agent.RegisterEventHandler(&EventHandler{})
}
func join(addresses []string) error {
_, err := bus.agent.Join(addresses, false)
if err != nil {
return err
}
return nil
}
//Publish send event
func Publish(event *KVChangeEvent) error {
b, err := json.Marshal(event)
if err != nil {
return err
}
return bus.agent.UserEvent(EventKVChange, b, true)
}
//ObserveOnce observe key changes by (key or labels) or (key and labels)
func ObserveOnce(o *Observer, topic *Topic) error {
topic.Format()
b, err := json.Marshal(topic)
if err != nil {
return err
}
t := string(b)
observers, ok := topics.Load(t)
if !ok {
topics.Store(t, map[string]*Observer{
o.UUID: o,
})
openlogging.Info("new topic:" + t)
return nil
}
mutexObservers.Lock()
observers.(map[string]*Observer)[o.UUID] = o
mutexObservers.Unlock()
openlogging.Debug("add new observer for topic:" + t)
return nil
}