blob: a92890e83f11216150811d1e92134ef44919324a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package server
import (
"context"
"errors"
"strings"
"time"
)
import (
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
"github.com/go-logr/logr"
"golang.org/x/exp/maps"
)
import (
"github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
"github.com/apache/dubbo-kubernetes/pkg/dds/reconcile"
"github.com/apache/dubbo-kubernetes/pkg/events"
util_maps "github.com/apache/dubbo-kubernetes/pkg/util/maps"
)
type EventBasedWatchdog struct {
Ctx context.Context
Node *envoy_core.Node
EventBus events.EventBus
Reconciler reconcile.Reconciler
ProvidedTypes map[model.ResourceType]struct{}
Log logr.Logger
NewFlushTicker func() *time.Ticker
NewFullResyncTicker func() *time.Ticker
}
func (e *EventBasedWatchdog) Start(stop <-chan struct{}) {
listener := e.EventBus.Subscribe(func(event events.Event) bool {
resChange, ok := event.(events.ResourceChangedEvent)
if !ok {
return false
}
if _, ok := e.ProvidedTypes[resChange.Type]; !ok {
return false
}
return true
})
flushTicker := e.NewFlushTicker()
defer flushTicker.Stop()
fullResyncTicker := e.NewFullResyncTicker()
defer fullResyncTicker.Stop()
// for the first reconcile assign all types
changedTypes := maps.Clone(e.ProvidedTypes)
reasons := map[string]struct{}{
ReasonResync: {},
}
for {
select {
case <-stop:
if err := e.Reconciler.Clear(e.Ctx, e.Node); err != nil {
e.Log.Error(err, "reconcile clear failed")
}
listener.Close()
return
case <-flushTicker.C:
if len(changedTypes) == 0 {
continue
}
reason := strings.Join(util_maps.SortedKeys(reasons), "_and_")
e.Log.V(1).Info("reconcile", "changedTypes", changedTypes, "reason", reason)
err, _ := e.Reconciler.Reconcile(e.Ctx, e.Node, changedTypes, e.Log)
if err != nil && errors.Is(err, context.Canceled) {
e.Log.Error(err, "reconcile failed", "changedTypes", changedTypes, "reason", reason)
} else {
changedTypes = map[model.ResourceType]struct{}{}
reasons = map[string]struct{}{}
}
case <-fullResyncTicker.C:
e.Log.V(1).Info("schedule full resync")
changedTypes = maps.Clone(e.ProvidedTypes)
reasons[ReasonResync] = struct{}{}
case event := <-listener.Recv():
resChange := event.(events.ResourceChangedEvent)
e.Log.V(1).Info("schedule sync for type", "typ", resChange.Type)
changedTypes[resChange.Type] = struct{}{}
reasons[ReasonEvent] = struct{}{}
}
}
}