blob: a185ebdeafc49a3b7218998eabdafe12cbba50e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package pusher
import (
"sync"
)
import (
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
)
type (
ResourceChangedCallbackFn func(items PushedItems)
ResourceChangedEventFilter func(resourceList core_model.ResourceList) core_model.ResourceList
)
type ResourceChangedCallback struct {
mu sync.Mutex // Only one can run at a time
Callback ResourceChangedCallbackFn
Filters []ResourceChangedEventFilter
}
func (c *ResourceChangedCallback) Invoke(items PushedItems) {
c.mu.Lock()
defer c.mu.Unlock()
pushed := items.resourceList
for _, filter := range c.Filters {
pushed = filter(items.resourceList)
}
if len(pushed.GetItems()) == 0 {
return
}
callback := c.Callback
callback(items)
}
type ResourceChangedCallbacks struct {
// mu to protect resourceChangedCallbacks
mu sync.RWMutex
callbackMap map[core_model.ResourceType]map[string]*ResourceChangedCallback
}
func NewResourceChangedCallbacks() *ResourceChangedCallbacks {
return &ResourceChangedCallbacks{
callbackMap: make(map[core_model.ResourceType]map[string]*ResourceChangedCallback),
}
}
func (callbacks *ResourceChangedCallbacks) InvokeCallbacks(resourceType core_model.ResourceType, items PushedItems) {
callbacks.mu.RLock()
defer callbacks.mu.RUnlock()
var wg sync.WaitGroup
for _, c := range callbacks.callbackMap[resourceType] {
tmpCallback := c
wg.Add(1)
go func() {
tmpCallback.Invoke(items)
wg.Done()
}()
}
wg.Wait()
}
func (callbacks *ResourceChangedCallbacks) AddCallBack(
resourceType core_model.ResourceType,
id string,
callback ResourceChangedCallbackFn,
filters ...ResourceChangedEventFilter,
) {
callbacks.mu.Lock()
defer callbacks.mu.Unlock()
if _, ok := callbacks.callbackMap[resourceType]; !ok {
callbacks.callbackMap[resourceType] = make(map[string]*ResourceChangedCallback)
}
callbacks.callbackMap[resourceType][id] = &ResourceChangedCallback{Callback: callback, Filters: filters}
}
func (callbacks *ResourceChangedCallbacks) RemoveCallBack(resourceType core_model.ResourceType, id string) {
callbacks.mu.Lock()
defer callbacks.mu.Unlock()
if _, ok := callbacks.callbackMap[resourceType]; !ok {
return
}
delete(callbacks.callbackMap[resourceType], id)
}
func (callbacks *ResourceChangedCallbacks) GetCallBack(resourceType core_model.ResourceType, id string) (*ResourceChangedCallback, bool) {
callbacks.mu.RLock()
defer callbacks.mu.RUnlock()
if _, ok := callbacks.callbackMap[resourceType]; !ok {
return nil, false
}
cb, ok := callbacks.callbackMap[resourceType][id]
return cb, ok
}