blob: f0bdfb662d8e013ec0e6caf5cc1556609db35e20 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package tracker
import (
"context"
)
import (
envoy_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
envoy_service_health "github.com/envoyproxy/go-control-plane/envoy/service/health/v3"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/wrapperspb"
)
import (
dp_server "github.com/apache/dubbo-kubernetes/pkg/config/dp-server"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
"github.com/apache/dubbo-kubernetes/pkg/core/xds"
"github.com/apache/dubbo-kubernetes/pkg/hds/cache"
util_proto "github.com/apache/dubbo-kubernetes/pkg/util/proto"
util_xds_v3 "github.com/apache/dubbo-kubernetes/pkg/util/xds/v3"
"github.com/apache/dubbo-kubernetes/pkg/xds/envoy/names"
)
type SnapshotGenerator struct {
config *dp_server.HdsConfig
readOnlyResourceManager manager.ReadOnlyResourceManager
defaultAdminPort uint32
}
func NewSnapshotGenerator(
readOnlyResourceManager manager.ReadOnlyResourceManager,
config *dp_server.HdsConfig,
defaultAdminPort uint32,
) *SnapshotGenerator {
return &SnapshotGenerator{
readOnlyResourceManager: readOnlyResourceManager,
config: config,
defaultAdminPort: defaultAdminPort,
}
}
func (g *SnapshotGenerator) GenerateSnapshot(ctx context.Context, node *envoy_core.Node) (util_xds_v3.Snapshot, error) {
proxyId, err := xds.ParseProxyIdFromString(node.Id)
if err != nil {
return nil, err
}
dp := mesh.NewDataplaneResource()
if err := g.readOnlyResourceManager.Get(ctx, dp, store.GetBy(proxyId.ToResourceKey())); err != nil {
return nil, err
}
healthChecks := []*envoy_service_health.ClusterHealthCheck{
g.envoyHealthCheck(dp.AdminPort(g.defaultAdminPort)),
}
for _, inbound := range dp.Spec.GetNetworking().GetInbound() {
if inbound.ServiceProbe == nil {
continue
}
serviceProbe := inbound.ServiceProbe
intf := dp.Spec.GetNetworking().ToInboundInterface(inbound)
var timeout *durationpb.Duration
if serviceProbe.Timeout == nil {
timeout = util_proto.Duration(g.config.CheckDefaults.Timeout.Duration)
} else {
timeout = serviceProbe.Timeout
}
var interval *durationpb.Duration
if serviceProbe.Timeout == nil {
interval = util_proto.Duration(g.config.CheckDefaults.Interval.Duration)
} else {
interval = serviceProbe.Interval
}
var healthyThreshold *wrapperspb.UInt32Value
if serviceProbe.HealthyThreshold == nil {
healthyThreshold = util_proto.UInt32(g.config.CheckDefaults.HealthyThreshold)
} else {
healthyThreshold = serviceProbe.HealthyThreshold
}
var unhealthyThreshold *wrapperspb.UInt32Value
if serviceProbe.UnhealthyThreshold == nil {
unhealthyThreshold = util_proto.UInt32(g.config.CheckDefaults.UnhealthyThreshold)
} else {
unhealthyThreshold = serviceProbe.UnhealthyThreshold
}
hc := &envoy_service_health.ClusterHealthCheck{
ClusterName: names.GetLocalClusterName(intf.WorkloadPort),
LocalityEndpoints: []*envoy_service_health.LocalityEndpoints{{
Endpoints: []*envoy_endpoint.Endpoint{{
Address: &envoy_core.Address{
Address: &envoy_core.Address_SocketAddress{
SocketAddress: &envoy_core.SocketAddress{
Address: intf.WorkloadIP,
PortSpecifier: &envoy_core.SocketAddress_PortValue{
PortValue: intf.WorkloadPort,
},
},
},
},
}},
}},
HealthChecks: []*envoy_core.HealthCheck{
{
Timeout: timeout,
Interval: interval,
HealthyThreshold: healthyThreshold,
UnhealthyThreshold: unhealthyThreshold,
NoTrafficInterval: util_proto.Duration(g.config.CheckDefaults.NoTrafficInterval.Duration),
HealthChecker: &envoy_core.HealthCheck_TcpHealthCheck_{
TcpHealthCheck: &envoy_core.HealthCheck_TcpHealthCheck{},
},
},
},
}
healthChecks = append(healthChecks, hc)
}
hcs := &envoy_service_health.HealthCheckSpecifier{
ClusterHealthChecks: healthChecks,
Interval: util_proto.Duration(g.config.Interval.Duration),
}
return cache.NewSnapshot("", hcs), nil
}
// envoyHealthCheck builds a HC for Envoy itself so when Envoy is in draining state HDS can report that DP is offline
func (g *SnapshotGenerator) envoyHealthCheck(port uint32) *envoy_service_health.ClusterHealthCheck {
return &envoy_service_health.ClusterHealthCheck{
ClusterName: names.GetEnvoyAdminClusterName(),
LocalityEndpoints: []*envoy_service_health.LocalityEndpoints{{
Endpoints: []*envoy_endpoint.Endpoint{{
Address: &envoy_core.Address{
Address: &envoy_core.Address_SocketAddress{
SocketAddress: &envoy_core.SocketAddress{
Address: "127.0.0.1",
PortSpecifier: &envoy_core.SocketAddress_PortValue{
PortValue: port,
},
},
},
},
}},
}},
HealthChecks: []*envoy_core.HealthCheck{
{
Timeout: util_proto.Duration(g.config.CheckDefaults.Timeout.Duration),
Interval: util_proto.Duration(g.config.CheckDefaults.Interval.Duration),
HealthyThreshold: util_proto.UInt32(g.config.CheckDefaults.HealthyThreshold),
UnhealthyThreshold: util_proto.UInt32(g.config.CheckDefaults.UnhealthyThreshold),
NoTrafficInterval: util_proto.Duration(g.config.CheckDefaults.NoTrafficInterval.Duration),
HealthChecker: &envoy_core.HealthCheck_HttpHealthCheck_{
HttpHealthCheck: &envoy_core.HealthCheck_HttpHealthCheck{
Path: "/ready",
},
},
},
},
}
}