blob: f91b037abd7331b8da658ce3b3a619935188de65 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package pusher
import (
"context"
"reflect"
"time"
)
import (
"github.com/apache/dubbo-kubernetes/pkg/core"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/registry"
"github.com/apache/dubbo-kubernetes/pkg/events"
)
var log = core.Log.WithName("dubbo").WithName("server").WithName("pusher")
const (
eventsChannelSize = 10000
requestChannelSize = 1000
)
type changedEvent struct {
resourceType core_model.ResourceType
event events.Event
}
type pusher struct {
ctx context.Context
resourceManager manager.ResourceManager
eventBus events.EventBus
newFullResyncTicker func() *time.Ticker
resourceTypes map[core_model.ResourceType]struct{}
resourceRevisions map[core_model.ResourceType]revision
resourceLastPushed map[core_model.ResourceType]core_model.ResourceList
resourceChangedEventListeners map[core_model.ResourceType]events.Listener
eventsChannel chan *changedEvent
requestChannel chan struct {
request interface{}
requestFilter ResourceRequestFilter
resourceType core_model.ResourceType
id string
}
resourceChangedCallbacks *ResourceChangedCallbacks
}
func NewPusher(
resourceManager manager.ResourceManager,
eventBus events.EventBus,
newFullResyncTicker func() *time.Ticker,
resourceTypes []core_model.ResourceType,
) Pusher {
p := &pusher{
resourceManager: resourceManager,
eventBus: eventBus,
newFullResyncTicker: newFullResyncTicker,
resourceTypes: make(map[core_model.ResourceType]struct{}),
resourceRevisions: make(map[core_model.ResourceType]revision),
resourceLastPushed: make(map[core_model.ResourceType]core_model.ResourceList),
resourceChangedEventListeners: make(map[core_model.ResourceType]events.Listener),
eventsChannel: make(chan *changedEvent, eventsChannelSize),
requestChannel: make(chan struct {
request interface{}
requestFilter ResourceRequestFilter
resourceType core_model.ResourceType
id string
}, requestChannelSize),
resourceChangedCallbacks: NewResourceChangedCallbacks(),
}
for _, resourceType := range resourceTypes {
p.registerResourceType(resourceType)
}
return p
}
func (p *pusher) registerResourceType(resourceType core_model.ResourceType) {
if _, ok := p.resourceTypes[resourceType]; ok {
return
}
p.resourceTypes[resourceType] = struct{}{}
p.resourceRevisions[resourceType] = 0
// subscribe Resource Changed Event
resourceChanged := p.eventBus.Subscribe(func(event events.Event) bool {
resourceChangedEvent, ok := event.(events.ResourceChangedEvent)
if ok {
return resourceChangedEvent.Type == resourceType
}
return false
})
p.resourceChangedEventListeners[resourceType] = resourceChanged
}
func (p *pusher) receiveResourceChangedEvents(stop <-chan struct{}, resourceType core_model.ResourceType) {
if _, ok := p.resourceTypes[resourceType]; !ok {
return
}
for {
select {
case <-stop:
p.resourceChangedEventListeners[resourceType].Close()
return
case event := <-p.resourceChangedEventListeners[resourceType].Recv():
p.eventsChannel <- &changedEvent{
resourceType: resourceType,
event: event,
}
}
}
}
func (p *pusher) Start(stop <-chan struct{}) error {
log.Info("pusher start")
ctx, cancel := context.WithCancel(context.Background())
// receive ResourceChanged Events
for resourceType := range p.resourceTypes {
log.Info("start receive ResourceChanged Event", "ResourceType", resourceType)
go p.receiveResourceChangedEvents(stop, resourceType)
}
fullResyncTicker := p.newFullResyncTicker()
defer fullResyncTicker.Stop()
for {
select {
case <-stop:
log.Info("pusher stopped")
cancel()
return nil
case ce := <-p.eventsChannel:
log.Info("event received", "ResourceType", ce.resourceType)
resourceList, err := registry.Global().NewList(ce.resourceType)
if err != nil {
log.Error(err, "failed to get resourceList")
continue
}
err = p.resourceManager.List(ctx, resourceList)
if err != nil {
log.Error(err, "list resource failed", "ResourceType", ce.resourceType)
continue
}
if reflect.DeepEqual(p.resourceLastPushed[ce.resourceType], resourceList) {
log.Info("resource not changed, nothing to push")
continue
}
p.resourceRevisions[ce.resourceType]++
p.resourceLastPushed[ce.resourceType] = resourceList
log.Info("invoke callbacks", "ResourceType", ce.resourceType, "revision", p.resourceRevisions[ce.resourceType])
// for a ResourceChangedEvent, invoke all callbacks.
p.resourceChangedCallbacks.InvokeCallbacks(ce.resourceType, PushedItems{
resourceList: resourceList,
revision: p.resourceRevisions[ce.resourceType],
})
case req := <-p.requestChannel:
resourceType := req.resourceType
id := req.id
log.Info("received a push request", "ResourceType", resourceType, "id", id)
cb, ok := p.resourceChangedCallbacks.GetCallBack(resourceType, id)
if !ok {
log.Info("not found callback", "ResourceType", resourceType, "id", id)
continue
}
revision := p.resourceRevisions[resourceType]
lastedPushed := p.resourceLastPushed[resourceType]
if lastedPushed == nil {
log.Info("last pushed is nil", "ResourceType", resourceType, "id", id)
continue
}
resourceList := lastedPushed
if req.requestFilter != nil {
resourceList = req.requestFilter(req.request, lastedPushed)
}
cb.Invoke(PushedItems{
resourceList: resourceList,
revision: revision,
})
case <-fullResyncTicker.C:
log.Info("full resync ticker arrived, starting resync for all types", "ResourceTypes", p.resourceTypes)
for resourceType := range p.resourceTypes {
revision := p.resourceRevisions[resourceType]
lastedPushed := p.resourceLastPushed[resourceType]
if lastedPushed == nil {
continue
}
// for a ResourceChangedEvent, invoke all callbacks.
p.resourceChangedCallbacks.InvokeCallbacks(resourceType, PushedItems{
resourceList: lastedPushed,
revision: revision,
})
}
}
}
}
func (p *pusher) NeedLeaderElection() bool {
return false
}
func (p *pusher) AddCallback(resourceType core_model.ResourceType, id string, callback ResourceChangedCallbackFn, filters ...ResourceChangedEventFilter) {
p.resourceChangedCallbacks.AddCallBack(resourceType, id, callback, filters...)
}
func (p *pusher) RemoveCallback(resourceType core_model.ResourceType, id string) {
p.resourceChangedCallbacks.RemoveCallBack(resourceType, id)
}
func (p *pusher) InvokeCallback(resourceType core_model.ResourceType, id string, request interface{}, requestFilter ResourceRequestFilter) {
p.requestChannel <- struct {
request interface{}
requestFilter ResourceRequestFilter
resourceType core_model.ResourceType
id string
}{
request: request,
requestFilter: requestFilter,
resourceType: resourceType,
id: id,
}
}