blob: ff02e46663939d2492eb99a7a705ec653f98db52 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zone
import (
"github.com/pkg/errors"
)
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
"github.com/apache/dubbo-kubernetes/pkg/config"
"github.com/apache/dubbo-kubernetes/pkg/config/core/resources/store"
"github.com/apache/dubbo-kubernetes/pkg/core"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/registry"
core_runtime "github.com/apache/dubbo-kubernetes/pkg/core/runtime"
"github.com/apache/dubbo-kubernetes/pkg/core/runtime/component"
dds_client "github.com/apache/dubbo-kubernetes/pkg/dds/client"
"github.com/apache/dubbo-kubernetes/pkg/dds/mux"
dds_server "github.com/apache/dubbo-kubernetes/pkg/dds/server"
dds_sync_store "github.com/apache/dubbo-kubernetes/pkg/dds/store"
resources_k8s "github.com/apache/dubbo-kubernetes/pkg/plugins/resources/k8s"
)
var ddsDeltaZoneLog = core.Log.WithName("dds-delta-zone")
func Setup(rt core_runtime.Runtime) error {
if !rt.Config().IsFederatedZoneCP() {
return nil
}
zone := rt.Config().Multizone.Zone.Name
reg := registry.Global()
ddsCtx := rt.DDSContext()
ddsServer, err := dds_server.New(
ddsDeltaZoneLog,
rt,
reg.ObjectTypes(model.HasDDSFlag(model.ZoneToGlobalFlag)),
zone,
rt.Config().Multizone.Zone.DDS.RefreshInterval.Duration,
ddsCtx.ZoneProvidedFilter,
ddsCtx.ZoneResourceMapper,
rt.Config().Multizone.Zone.DDS.NackBackoff.Duration,
)
if err != nil {
return err
}
resourceSyncer, err := dds_sync_store.NewResourceSyncer(ddsDeltaZoneLog, rt.ResourceManager(), rt.Transactions(), rt.Extensions())
if err != nil {
return err
}
kubeFactory := resources_k8s.NewSimpleKubeFactory()
cfg := rt.Config()
cfgForDisplay, err := config.ConfigForDisplay(&cfg)
if err != nil {
return errors.Wrap(err, "could not construct config for display")
}
cfgJson, err := config.ToJson(cfgForDisplay)
if err != nil {
return errors.Wrap(err, "could not marshall config to json")
}
onGlobalToZoneSyncStarted := mux.OnGlobalToZoneSyncStartedFunc(func(stream mesh_proto.DDSSyncService_GlobalToZoneSyncClient, errChan chan error) {
log := ddsDeltaZoneLog.WithValues("kds-version", "v2")
syncClient := dds_client.NewDDSSyncClient(
log,
reg.ObjectTypes(model.HasDDSFlag(model.GlobalToZoneSelector)),
dds_client.NewDeltaDDSStream(stream, zone, rt, string(cfgJson)),
dds_sync_store.ZoneSyncCallback(
stream.Context(),
rt.DDSContext().Configs,
resourceSyncer,
rt.Config().Store.Type == store.KubernetesStore,
zone,
kubeFactory,
rt.Config().Store.Kubernetes.SystemNamespace,
),
rt.Config().Multizone.Zone.DDS.ResponseBackoff.Duration,
)
go func() {
if err := syncClient.Receive(); err != nil {
errChan <- errors.Wrap(err, "GlobalToZoneSyncClient finished with an error")
} else {
log.V(1).Info("GlobalToZoneSyncClient finished gracefully")
}
}()
})
onZoneToGlobalSyncStarted := mux.OnZoneToGlobalSyncStartedFunc(func(stream mesh_proto.DDSSyncService_ZoneToGlobalSyncClient, errChan chan error) {
log := ddsDeltaZoneLog.WithValues("kds-version", "v2", "peer-id", "global")
log.Info("ZoneToGlobalSync new session created")
session := dds_server.NewServerStream(stream)
go func() {
if err := ddsServer.ZoneToGlobal(session); err != nil {
errChan <- errors.Wrap(err, "ZoneToGlobalSync finished with an error")
} else {
log.V(1).Info("ZoneToGlobalSync finished gracefully")
}
}()
})
muxClient := mux.NewClient(
rt.DDSContext().ZoneClientCtx,
rt.Config().Multizone.Zone.GlobalAddress,
zone,
onGlobalToZoneSyncStarted,
onZoneToGlobalSyncStarted,
*rt.Config().Multizone.Zone.DDS,
)
return rt.Add(component.NewResilientComponent(ddsDeltaZoneLog.WithName("kds-mux-client"), muxClient))
}