blob: f5c64cca0fb4c11e5f8e7d45191f72b579cdf5dd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package tracker
import (
"context"
"sync"
"time"
)
import (
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_service_health "github.com/envoyproxy/go-control-plane/envoy/service/health/v3"
"github.com/go-logr/logr"
"github.com/pkg/errors"
)
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
dp_server "github.com/apache/dubbo-kubernetes/pkg/config/dp-server"
"github.com/apache/dubbo-kubernetes/pkg/core"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
"github.com/apache/dubbo-kubernetes/pkg/core/xds"
hds_callbacks "github.com/apache/dubbo-kubernetes/pkg/hds/callbacks"
"github.com/apache/dubbo-kubernetes/pkg/util/watchdog"
util_xds_v3 "github.com/apache/dubbo-kubernetes/pkg/util/xds/v3"
"github.com/apache/dubbo-kubernetes/pkg/xds/envoy/names"
)
type streams struct {
watchdogCancel context.CancelFunc
activeStreams map[xds.StreamID]bool
}
type tracker struct {
resourceManager manager.ResourceManager
config *dp_server.HdsConfig
reconciler *reconciler
log logr.Logger
sync.RWMutex // protects access to the fields below
streamsAssociation map[xds.StreamID]core_model.ResourceKey
dpStreams map[core_model.ResourceKey]streams
}
func NewCallbacks(
log logr.Logger,
resourceManager manager.ResourceManager,
readOnlyResourceManager manager.ReadOnlyResourceManager,
cache util_xds_v3.SnapshotCache,
config *dp_server.HdsConfig,
hasher util_xds_v3.NodeHash,
defaultAdminPort uint32,
) hds_callbacks.Callbacks {
return &tracker{
resourceManager: resourceManager,
streamsAssociation: map[xds.StreamID]core_model.ResourceKey{},
dpStreams: map[core_model.ResourceKey]streams{},
config: config,
log: log,
reconciler: &reconciler{
cache: cache,
hasher: hasher,
versioner: util_xds_v3.SnapshotAutoVersioner{UUID: core.NewUUID},
generator: NewSnapshotGenerator(readOnlyResourceManager, config, defaultAdminPort),
},
}
}
func (t *tracker) OnStreamOpen(ctx context.Context, streamID int64) error {
return nil
}
func (t *tracker) OnStreamClosed(streamID xds.StreamID) {
t.Lock()
defer t.Unlock()
dp, hasAssociation := t.streamsAssociation[streamID]
if hasAssociation {
delete(t.streamsAssociation, streamID)
streams := t.dpStreams[dp]
delete(streams.activeStreams, streamID)
if len(streams.activeStreams) == 0 { // no stream is active, cancel watchdog
if streams.watchdogCancel != nil {
streams.watchdogCancel()
}
delete(t.dpStreams, dp)
}
}
}
func (t *tracker) OnHealthCheckRequest(streamID xds.StreamID, req *envoy_service_health.HealthCheckRequest) error {
proxyId, err := xds.ParseProxyIdFromString(req.GetNode().GetId())
if err != nil {
t.log.Error(err, "failed to parse Dataplane Id out of HealthCheckRequest", "streamid", streamID, "req", req)
return nil
}
dataplaneKey := proxyId.ToResourceKey()
t.Lock()
defer t.Unlock()
streams := t.dpStreams[dataplaneKey]
if streams.activeStreams == nil {
streams.activeStreams = map[xds.StreamID]bool{}
}
streams.activeStreams[streamID] = true
if streams.watchdogCancel == nil { // watchdog was not started yet
stopCh := make(chan struct{})
streams.watchdogCancel = func() {
close(stopCh)
}
// kick off watchdog for that Dataplane
go t.newWatchdog(req.Node).Start(stopCh)
t.log.V(1).Info("started Watchdog for a Dataplane", "streamid", streamID, "proxyId", proxyId, "dataplaneKey", dataplaneKey)
}
t.dpStreams[dataplaneKey] = streams
t.streamsAssociation[streamID] = dataplaneKey
return nil
}
func (t *tracker) newWatchdog(node *envoy_core.Node) watchdog.Watchdog {
return &watchdog.SimpleWatchdog{
NewTicker: func() *time.Ticker {
return time.NewTicker(t.config.RefreshInterval.Duration)
},
OnTick: func(ctx context.Context) error {
return t.reconciler.Reconcile(ctx, node)
},
OnError: func(err error) {
t.log.Error(err, "OnTick() failed")
},
OnStop: func() {
if err := t.reconciler.Clear(node); err != nil {
t.log.Error(err, "OnTick() failed")
}
},
}
}
func (t *tracker) OnEndpointHealthResponse(streamID xds.StreamID, resp *envoy_service_health.EndpointHealthResponse) error {
healthMap := map[uint32]bool{}
envoyHealth := true // if there is no Envoy HC, assume it's healthy
for _, clusterHealth := range resp.GetClusterEndpointsHealth() {
if len(clusterHealth.LocalityEndpointsHealth) == 0 {
continue
}
if len(clusterHealth.LocalityEndpointsHealth[0].EndpointsHealth) == 0 {
continue
}
status := clusterHealth.LocalityEndpointsHealth[0].EndpointsHealth[0].HealthStatus
health := status == envoy_core.HealthStatus_HEALTHY || status == envoy_core.HealthStatus_UNKNOWN
if clusterHealth.ClusterName == names.GetEnvoyAdminClusterName() {
envoyHealth = health
} else {
port, err := names.GetPortForLocalClusterName(clusterHealth.ClusterName)
if err != nil {
return err
}
healthMap[port] = health
}
}
if err := t.updateDataplane(streamID, healthMap, envoyHealth); err != nil {
return err
}
return nil
}
func (t *tracker) updateDataplane(streamID xds.StreamID, healthMap map[uint32]bool, envoyHealth bool) error {
t.RLock()
defer t.RUnlock()
dataplaneKey, hasAssociation := t.streamsAssociation[streamID]
if !hasAssociation {
return errors.Errorf("no proxy for streamID = %d", streamID)
}
dp := mesh.NewDataplaneResource()
if err := t.resourceManager.Get(context.Background(), dp, store.GetBy(dataplaneKey)); err != nil {
return err
}
changed := false
for _, inbound := range dp.Spec.Networking.Inbound {
intf := dp.Spec.Networking.ToInboundInterface(inbound)
workloadHealth, exist := healthMap[intf.WorkloadPort]
if exist {
workloadHealth = workloadHealth && envoyHealth
} else {
workloadHealth = envoyHealth
}
if workloadHealth && inbound.State == mesh_proto.Dataplane_Networking_Inbound_NotReady {
inbound.State = mesh_proto.Dataplane_Networking_Inbound_Ready
// write health for backwards compatibility with Dubbo 2.5 and older
inbound.Health = &mesh_proto.Dataplane_Networking_Inbound_Health{
Ready: true,
}
changed = true
} else if !workloadHealth && inbound.State == mesh_proto.Dataplane_Networking_Inbound_Ready {
inbound.State = mesh_proto.Dataplane_Networking_Inbound_NotReady
// write health for backwards compatibility with Dubbo 2.5 and older
inbound.Health = &mesh_proto.Dataplane_Networking_Inbound_Health{
Ready: false,
}
changed = true
}
}
if changed {
t.log.V(1).Info("status updated", "dataplaneKey", dataplaneKey)
return t.resourceManager.Update(context.Background(), dp)
}
return nil
}