blob: 5bcbf39e0610b90cbc00f4834cf5d3498316bcf0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package admin
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net"
"net/http"
"net/url"
"time"
)
import (
envoy_admin_v3 "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
"github.com/pkg/errors"
)
import (
core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
util_proto "github.com/apache/dubbo-kubernetes/pkg/util/proto"
)
type EnvoyAdminClient interface {
PostQuit(ctx context.Context, dataplane *core_mesh.DataplaneResource) error
Stats(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error)
Clusters(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error)
ConfigDump(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error)
}
type envoyAdminClient struct {
rm manager.ResourceManager
defaultAdminPort uint32
caCertPool *x509.CertPool
clientCert *tls.Certificate
}
func NewEnvoyAdminClient(rm manager.ResourceManager, adminPort uint32) EnvoyAdminClient {
client := &envoyAdminClient{
rm: rm,
defaultAdminPort: adminPort,
}
return client
}
func (a *envoyAdminClient) buildHTTPClient(ctx context.Context) (*http.Client, error) {
c := &http.Client{
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: 3 * time.Second,
}).DialContext,
},
Timeout: 5 * time.Second,
}
return c, nil
}
const (
quitquitquit = "quitquitquit"
)
func (a *envoyAdminClient) PostQuit(ctx context.Context, dataplane *core_mesh.DataplaneResource) error {
httpClient, err := a.buildHTTPClient(ctx)
if err != nil {
return err
}
url := fmt.Sprintf("https://%s/%s", dataplane.AdminAddress(a.defaultAdminPort), quitquitquit)
request, err := http.NewRequestWithContext(ctx, "POST", url, nil)
if err != nil {
return err
}
// Envoy will not send back any response, so do we not check the response
response, err := httpClient.Do(request)
if errors.Is(err, io.EOF) {
return nil // Envoy may not respond correctly for this request because it already started the shut-down process.
}
if err != nil {
return errors.Wrapf(err, "unable to send POST to %s", quitquitquit)
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return errors.Errorf("envoy response [%d %s] [%s]", response.StatusCode, response.Status, response.Body)
}
return nil
}
func (a *envoyAdminClient) Stats(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
return a.executeRequest(ctx, proxy, "stats")
}
func (a *envoyAdminClient) Clusters(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
return a.executeRequest(ctx, proxy, "clusters")
}
func (a *envoyAdminClient) ConfigDump(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
configDump, err := a.executeRequest(ctx, proxy, "config_dump")
if err != nil {
return nil, err
}
cd := &envoy_admin_v3.ConfigDump{}
if err := util_proto.FromJSON(configDump, cd); err != nil {
return nil, err
}
if err := Sanitize(cd); err != nil {
return nil, err
}
return util_proto.ToJSONIndent(cd, " ")
}
func (a *envoyAdminClient) executeRequest(ctx context.Context, proxy core_model.ResourceWithAddress, path string) ([]byte, error) {
var httpClient *http.Client
var err error
u := &url.URL{}
switch proxy.(type) {
case *core_mesh.DataplaneResource:
httpClient, err = a.buildHTTPClient(ctx)
if err != nil {
return nil, err
}
u.Scheme = "https"
case *core_mesh.ZoneIngressResource, *core_mesh.ZoneEgressResource:
httpClient, err = a.buildHTTPClient(ctx)
if err != nil {
return nil, err
}
u.Scheme = "https"
default:
return nil, errors.New("unsupported proxy type")
}
if host, _, err := net.SplitHostPort(proxy.AdminAddress(a.defaultAdminPort)); err == nil && host == "127.0.0.1" {
httpClient = &http.Client{
Timeout: 5 * time.Second,
}
u.Scheme = "http"
}
u.Host = proxy.AdminAddress(a.defaultAdminPort)
u.Path = path
request, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
if err != nil {
return nil, err
}
response, err := httpClient.Do(request)
if err != nil {
return nil, errors.Wrapf(err, "unable to send GET to %s", "config_dump")
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return nil, errors.Errorf("envoy response [%d %s] [%s]", response.StatusCode, response.Status, response.Body)
}
resp, err := io.ReadAll(response.Body)
if err != nil {
return nil, err
}
return resp, nil
}