blob: 474b180c4143dd8243a6a6c791ef34768e39e43e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package admin
import (
"context"
"fmt"
"reflect"
)
import (
"github.com/pkg/errors"
)
import (
mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
"github.com/apache/dubbo-kubernetes/pkg/core"
core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
"github.com/apache/dubbo-kubernetes/pkg/dds/service"
util_grpc "github.com/apache/dubbo-kubernetes/pkg/util/grpc"
"github.com/apache/dubbo-kubernetes/pkg/util/k8s"
)
type ddsEnvoyAdminClient struct {
rpcs service.EnvoyAdminRPCs
k8sStore bool
}
func NewDDSEnvoyAdminClient(rpcs service.EnvoyAdminRPCs, k8sStore bool) EnvoyAdminClient {
return &ddsEnvoyAdminClient{
rpcs: rpcs,
k8sStore: k8sStore,
}
}
var _ EnvoyAdminClient = &ddsEnvoyAdminClient{}
func (k *ddsEnvoyAdminClient) PostQuit(context.Context, *core_mesh.DataplaneResource) error {
panic("not implemented")
}
func (k *ddsEnvoyAdminClient) ConfigDump(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
zone := core_model.ZoneOfResource(proxy)
nameInZone := resNameInZone(proxy)
reqId := core.NewUUID()
tenantZoneID := service.ZoneClientIDFromCtx(ctx, zone)
err := k.rpcs.XDSConfigDump.Send(tenantZoneID.String(), &mesh_proto.XDSConfigRequest{
RequestId: reqId,
ResourceType: string(proxy.Descriptor().Name),
ResourceName: nameInZone, // send the name which without the added prefix
ResourceMesh: proxy.GetMeta().GetMesh(), // should be empty for ZoneIngress/ZoneEgress
})
if err != nil {
return nil, &DDSTransportError{requestType: "XDSConfigRequest", reason: err.Error()}
}
defer k.rpcs.XDSConfigDump.DeleteWatch(tenantZoneID.String(), reqId)
ch := make(chan util_grpc.ReverseUnaryMessage)
if err := k.rpcs.XDSConfigDump.WatchResponse(tenantZoneID.String(), reqId, ch); err != nil {
return nil, errors.Wrapf(err, "could not watch the response")
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case resp := <-ch:
configResp, ok := resp.(*mesh_proto.XDSConfigResponse)
if !ok {
return nil, errors.New("invalid request type")
}
if configResp.GetError() != "" {
return nil, &DDSTransportError{requestType: "XDSConfigRequest", reason: configResp.GetError()}
}
return configResp.GetConfig(), nil
}
}
func (k *ddsEnvoyAdminClient) Stats(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
zone := core_model.ZoneOfResource(proxy)
nameInZone := resNameInZone(proxy)
reqId := core.NewUUID()
tenantZoneId := service.ZoneClientIDFromCtx(ctx, zone)
err := k.rpcs.Stats.Send(tenantZoneId.String(), &mesh_proto.StatsRequest{
RequestId: reqId,
ResourceType: string(proxy.Descriptor().Name),
ResourceName: nameInZone, // send the name which without the added prefix
ResourceMesh: proxy.GetMeta().GetMesh(), // should be empty for ZoneIngress/ZoneEgress
})
if err != nil {
return nil, &DDSTransportError{requestType: "StatsRequest", reason: err.Error()}
}
defer k.rpcs.Stats.DeleteWatch(tenantZoneId.String(), reqId)
ch := make(chan util_grpc.ReverseUnaryMessage)
if err := k.rpcs.Stats.WatchResponse(tenantZoneId.String(), reqId, ch); err != nil {
return nil, errors.Wrapf(err, "could not watch the response")
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case resp := <-ch:
statsResp, ok := resp.(*mesh_proto.StatsResponse)
if !ok {
return nil, errors.New("invalid request type")
}
if statsResp.GetError() != "" {
return nil, &DDSTransportError{requestType: "StatsRequest", reason: statsResp.GetError()}
}
return statsResp.GetStats(), nil
}
}
func (k *ddsEnvoyAdminClient) Clusters(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
zone := core_model.ZoneOfResource(proxy)
nameInZone := resNameInZone(proxy)
reqId := core.NewUUID()
tenantZoneID := service.ZoneClientIDFromCtx(ctx, zone)
err := k.rpcs.Clusters.Send(tenantZoneID.String(), &mesh_proto.ClustersRequest{
RequestId: reqId,
ResourceType: string(proxy.Descriptor().Name),
ResourceName: nameInZone, // send the name which without the added prefix
ResourceMesh: proxy.GetMeta().GetMesh(), // should be empty for ZoneIngress/ZoneEgress
})
if err != nil {
return nil, &DDSTransportError{requestType: "ClustersRequest", reason: err.Error()}
}
defer k.rpcs.Clusters.DeleteWatch(tenantZoneID.String(), reqId)
ch := make(chan util_grpc.ReverseUnaryMessage)
if err := k.rpcs.Clusters.WatchResponse(tenantZoneID.String(), reqId, ch); err != nil {
return nil, errors.Wrapf(err, "could not watch the response")
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case resp := <-ch:
clustersResp, ok := resp.(*mesh_proto.ClustersResponse)
if !ok {
return nil, errors.New("invalid request type")
}
if clustersResp.GetError() != "" {
return nil, &DDSTransportError{requestType: "ClustersRequest", reason: clustersResp.GetError()}
}
return clustersResp.GetClusters(), nil
}
}
func resNameInZone(r core_model.Resource) string {
name := core_model.GetDisplayName(r)
if ns := r.GetMeta().GetLabels()[mesh_proto.KubeNamespaceTag]; ns != "" {
name = k8s.K8sNamespacedNameToCoreName(name, ns)
}
return name
}
type DDSTransportError struct {
requestType string
reason string
}
func (e *DDSTransportError) Error() string {
if e.reason == "" {
return fmt.Sprintf("could not send %s", e.requestType)
} else {
return fmt.Sprintf("could not send %s: %s", e.requestType, e.reason)
}
}
func (e *DDSTransportError) Is(err error) bool {
return reflect.TypeOf(e) == reflect.TypeOf(err)
}