blob: bd5c09dc00b8042f933b9c60a6edef9d9ae0e9c5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package mux
import (
"context"
"time"
)
import (
"github.com/go-logr/logr"
)
import (
"github.com/apache/dubbo-kubernetes/pkg/config/multizone"
"github.com/apache/dubbo-kubernetes/pkg/core"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/system"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
"github.com/apache/dubbo-kubernetes/pkg/dds/service"
"github.com/apache/dubbo-kubernetes/pkg/events"
dubbo_log "github.com/apache/dubbo-kubernetes/pkg/log"
)
type zone struct {
zone string
}
type ZoneWatch struct {
log logr.Logger
poll time.Duration
timeout time.Duration
bus events.EventBus
extensions context.Context
rm manager.ReadOnlyResourceManager
zones map[zone]time.Time
}
func NewZoneWatch(
log logr.Logger,
cfg multizone.ZoneHealthCheckConfig,
bus events.EventBus,
rm manager.ReadOnlyResourceManager,
extensions context.Context,
) (*ZoneWatch, error) {
return &ZoneWatch{
log: log,
poll: cfg.PollInterval.Duration,
timeout: cfg.Timeout.Duration,
bus: bus,
extensions: extensions,
rm: rm,
zones: map[zone]time.Time{},
}, nil
}
func (zw *ZoneWatch) Start(stop <-chan struct{}) error {
timer := time.NewTicker(zw.poll)
defer timer.Stop()
connectionWatch := zw.bus.Subscribe(func(e events.Event) bool {
_, ok := e.(service.ZoneOpenedStream)
return ok
})
defer connectionWatch.Close()
for {
select {
case <-timer.C:
for zone, lastStreamOpened := range zw.zones {
ctx := context.Background()
zoneInsight := system.NewZoneInsightResource()
log := dubbo_log.AddFieldsFromCtx(zw.log, ctx, zw.extensions)
if err := zw.rm.Get(ctx, zoneInsight, store.GetByKey(zone.zone, model.NoMesh)); err != nil {
if store.IsResourceNotFound(err) {
zw.bus.Send(service.ZoneWentOffline{
Zone: zone.zone,
})
delete(zw.zones, zone)
} else {
log.Info("error getting ZoneInsight", "zone", zone.zone, "error", err)
}
continue
}
// It may be that we don't have a health check yet so we use the
// lastSeen time because we know the zone was connected at that
// point at least
lastHealthCheck := zoneInsight.Spec.GetHealthCheck().GetTime().AsTime()
if lastStreamOpened.After(lastHealthCheck) {
lastHealthCheck = lastStreamOpened
}
if time.Since(lastHealthCheck) > zw.timeout {
zw.bus.Send(service.ZoneWentOffline{
Zone: zone.zone,
})
delete(zw.zones, zone)
}
}
case e := <-connectionWatch.Recv():
newStream := e.(service.ZoneOpenedStream)
// We keep a record of the time we open a stream.
// This is to prevent the zone from timing out on a poll
// where the last health check is still from a previous connect, so:
// a long time ago: zone CP disconnects, no more health checks are sent
// now:
// zone CP opens streams
// global CP gets ZoneOpenedStream (but we don't stash the time as below)
// global CP runs poll and see the last health check from "a long time ago"
// BAD: global CP kills streams
// zone CP health check arrives
zw.zones[zone{
zone: newStream.Zone,
}] = core.Now()
case <-stop:
return nil
}
}
}
func (zw *ZoneWatch) NeedLeaderElection() bool {
return false
}