refactor (#64)
diff --git a/server/pubsub/event_handler.go b/server/pubsub/event_handler.go
index 338c5a7..941e143 100644
--- a/server/pubsub/event_handler.go
+++ b/server/pubsub/event_handler.go
@@ -33,31 +33,38 @@
switch e.EventType().String() {
case "user":
if strings.Contains(e.String(), EventKVChange) {
- ue := e.(serf.UserEvent)
- ke, err := NewKVChangeEvent(ue.Payload)
- if err != nil {
- openlogging.Error("invalid json:" + string(ue.Payload))
- }
- openlogging.Debug("kv event:" + ke.Key)
- topics.Range(func(key, value interface{}) bool { //range all topics
- t, err := ParseTopicString(key.(string))
- if err != nil {
- openlogging.Error("can not parse topic:" + key.(string))
- return true
- }
- if t.Match(ke) {
- observers := value.(map[string]*Observer)
- mutexObservers.Lock()
- defer mutexObservers.Unlock()
- for k, v := range observers {
- v.Event <- ke
- delete(observers, k)
- }
- }
- return true
- })
+ handleKVEvent(e)
}
-
}
}
+
+func handleKVEvent(e serf.Event) {
+ ue := e.(serf.UserEvent)
+ ke, err := NewKVChangeEvent(ue.Payload)
+ if err != nil {
+ openlogging.Error("invalid json:" + string(ue.Payload))
+ }
+ openlogging.Debug("kv event:" + ke.Key)
+ topics.Range(func(key, value interface{}) bool { //range all topics
+ t, err := ParseTopicString(key.(string))
+ if err != nil {
+ openlogging.Error("can not parse topic:" + key.(string))
+ return true
+ }
+ if t.Match(ke) {
+ fireEvent(value, ke)
+ }
+ return true
+ })
+}
+
+func fireEvent(value interface{}, ke *KVChangeEvent) {
+ observers := value.(map[string]*Observer)
+ mutexObservers.Lock()
+ defer mutexObservers.Unlock()
+ for k, v := range observers {
+ v.Event <- ke
+ delete(observers, k)
+ }
+}