blob: 9d0a5e59be9fc64687e920f1346c232316c3d0d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package v3
import (
"context"
"time"
)
import (
envoy_service_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
envoy_server "github.com/envoyproxy/go-control-plane/pkg/server/v3"
)
import (
"github.com/apache/dubbo-kubernetes/pkg/core"
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
core_runtime "github.com/apache/dubbo-kubernetes/pkg/core/runtime"
core_xds "github.com/apache/dubbo-kubernetes/pkg/core/xds"
util_xds "github.com/apache/dubbo-kubernetes/pkg/util/xds"
util_xds_v3 "github.com/apache/dubbo-kubernetes/pkg/util/xds/v3"
xds_context "github.com/apache/dubbo-kubernetes/pkg/xds/context"
"github.com/apache/dubbo-kubernetes/pkg/xds/envoy"
"github.com/apache/dubbo-kubernetes/pkg/xds/generator"
xds_callbacks "github.com/apache/dubbo-kubernetes/pkg/xds/server/callbacks"
xds_sync "github.com/apache/dubbo-kubernetes/pkg/xds/sync"
)
var xdsServerLog = core.Log.WithName("xds-server")
func RegisterXDS(
statsCallbacks util_xds.StatsCallbacks,
envoyCpCtx *xds_context.ControlPlaneContext,
rt core_runtime.Runtime,
) error {
xdsContext := NewXdsContext()
metadataTracker := xds_callbacks.NewDataplaneMetadataTracker()
reconciler := DefaultReconciler(rt, xdsContext, statsCallbacks)
ingressReconciler := DefaultIngressReconciler(rt, xdsContext, statsCallbacks)
watchdogFactory, err := xds_sync.DefaultDataplaneWatchdogFactory(rt, metadataTracker, reconciler, ingressReconciler, nil, envoyCpCtx, core_xds.APIVersion(envoy.APIV3))
if err != nil {
return err
}
callbacks := util_xds_v3.CallbacksChain{
util_xds_v3.NewControlPlaneIdCallbacks(rt.GetInstanceId()),
util_xds_v3.AdaptCallbacks(statsCallbacks),
util_xds_v3.AdaptCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(metadataTracker)),
util_xds_v3.AdaptCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(xds_callbacks.NewDataplaneSyncTracker(watchdogFactory.New))),
util_xds_v3.AdaptCallbacks(xds_callbacks.DataplaneCallbacksToXdsCallbacks(
xds_callbacks.NewDataplaneLifecycle(rt.AppContext(), rt.ResourceManager(), rt.Config().XdsServer.DataplaneDeregistrationDelay.Duration, rt.GetInstanceId())),
),
util_xds_v3.AdaptCallbacks(DefaultDataplaneStatusTracker(rt)),
util_xds_v3.AdaptCallbacks(xds_callbacks.NewNackBackoff(10)),
}
if cb := rt.XDS().ServerCallbacks; cb != nil {
callbacks = append(callbacks, util_xds_v3.AdaptCallbacks(cb))
}
srv := envoy_server.NewServer(context.Background(), xdsContext.Cache(), callbacks)
xdsServerLog.Info("registering Aggregated Discovery Service V3 in Dataplane Server")
envoy_service_discovery.RegisterAggregatedDiscoveryServiceServer(rt.DpServer().GrpcServer(), srv)
return nil
}
func DefaultReconciler(
rt core_runtime.Runtime,
xdsContext XdsContext,
statsCallbacks util_xds.StatsCallbacks,
) xds_sync.SnapshotReconciler {
return &reconciler{
generator: &TemplateSnapshotGenerator{
[]string{
generator.DefaultProxy,
},
},
cacher: &simpleSnapshotCacher{xdsContext.Hasher(), xdsContext.Cache()},
statsCallbacks: statsCallbacks,
}
}
func DefaultIngressReconciler(
rt core_runtime.Runtime,
xdsContext XdsContext,
statsCallbacks util_xds.StatsCallbacks,
) xds_sync.SnapshotReconciler {
return &reconciler{
generator: &TemplateSnapshotGenerator{[]string{generator.IngressProxy}},
cacher: &simpleSnapshotCacher{xdsContext.Hasher(), xdsContext.Cache()},
statsCallbacks: nil,
}
}
func DefaultDataplaneStatusTracker(rt core_runtime.Runtime) xds_callbacks.DataplaneStatusTracker {
return xds_callbacks.NewDataplaneStatusTracker(rt,
func(dataplaneType core_model.ResourceType, accessor xds_callbacks.SubscriptionStatusAccessor) xds_callbacks.DataplaneInsightSink {
return xds_callbacks.NewDataplaneInsightSink(
dataplaneType,
accessor,
func() *time.Ticker {
return time.NewTicker(rt.Config().XdsServer.DataplaneStatusFlushInterval.Duration)
},
func() *time.Ticker {
// Use default settings for now
return time.NewTicker(5 * time.Minute)
},
rt.Config().XdsServer.DataplaneStatusFlushInterval.Duration/10,
xds_callbacks.NewDataplaneInsightStore(rt.ResourceManager()),
)
})
}