blob: 1a86f9c2ccc0a568b069397718c765519817ec00 [file] [log] [blame]
package main
import (
"encoding/binary"
"fmt"
"math/rand"
"net"
"os"
"time"
)
import (
envoy_resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"go.uber.org/multierr"
)
import (
"github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
"github.com/apache/dubbo-kubernetes/pkg/core"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/model/rest/unversioned"
rest_v1alpha1 "github.com/apache/dubbo-kubernetes/pkg/core/resources/model/rest/v1alpha1"
dubbo_log "github.com/apache/dubbo-kubernetes/pkg/log"
"github.com/apache/dubbo-kubernetes/tools/xds-client/stream"
)
func newRootCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "dubbo-xds-client",
Short: "dubbo xDS client",
Long: `dubbo xDS client.`,
PersistentPreRun: func(_ *cobra.Command, _ []string) {
core.SetLogger(core.NewLogger(dubbo_log.DebugLevel))
},
}
// sub-commands
cmd.AddCommand(newRunCmd())
return cmd
}
func newRunCmd() *cobra.Command {
log := core.Log.WithName("dubbo-xds-client").WithName("run")
args := struct {
xdsServerAddress string
dps int
services int
inbounds int
outbounds int
rampUpPeriod time.Duration
}{
xdsServerAddress: "grpc://localhost:5678",
dps: 100,
services: 50,
inbounds: 1,
outbounds: 3,
rampUpPeriod: 30 * time.Second,
}
cmd := &cobra.Command{
Use: "run",
Short: "Start xDS client(s) that simulate Envoy",
Long: `Start xDS client(s) that simulate Envoy.`,
RunE: func(cmd *cobra.Command, _ []string) error {
ipRand := rand.Uint32() // #nosec G404 -- that's just a test tool
log.Info("going to start xDS clients (Envoy simulators)", "dps", args.dps)
errCh := make(chan error, 1)
for i := 0; i < args.dps; i++ {
id := fmt.Sprintf("default.dataplane-%d", i)
nodeLog := log.WithName("envoy-simulator").WithValues("idx", i, "ID", id)
nodeLog.Info("creating an xDS client ...")
go func(i int) {
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, ipRand+uint32(i))
ip := net.IP(buf).String()
dpSpec := &v1alpha1.Dataplane{
Networking: &v1alpha1.Dataplane_Networking{
Address: ip,
},
}
for j := 0; j < args.inbounds; j++ {
service := fmt.Sprintf("service-%d", rand.Int()%args.services) // #nosec G404 -- that's just a test tool
dpSpec.Networking.Inbound = append(dpSpec.Networking.Inbound, &v1alpha1.Dataplane_Networking_Inbound{
Port: uint32(8080 + j),
Tags: map[string]string{
v1alpha1.ServiceTag: service,
v1alpha1.ProtocolTag: "http",
},
})
}
for j := 0; j < args.outbounds; j++ {
service := fmt.Sprintf("service-%d", rand.Int()%args.services) // #nosec G404 -- that's just a test tool
dpSpec.Networking.Outbound = append(dpSpec.Networking.Outbound, &v1alpha1.Dataplane_Networking_Outbound{
Port: uint32(10080 + j), Tags: map[string]string{v1alpha1.ServiceTag: service},
})
}
dp := &unversioned.Resource{
Meta: rest_v1alpha1.ResourceMeta{Mesh: "default", Name: fmt.Sprintf("dataplane-%d.dubbo-system", i), Type: "Dataplane"},
Spec: dpSpec,
}
// add some jitter
delay := time.Duration(int64(float64(args.rampUpPeriod.Nanoseconds()) * rand.Float64())) // #nosec G404 -- that's just a test tool
// wait
<-time.After(delay)
// proceed
errCh <- func() (errs error) {
client, err := stream.New(args.xdsServerAddress)
if err != nil {
return errors.Wrap(err, "failed to connect to xDS server")
}
defer func() {
nodeLog.Info("closing a connection ...")
if err := client.Close(); err != nil {
errs = multierr.Append(errs, errors.Wrapf(err, "failed to close a connection"))
}
}()
nodeLog.Info("opening an xDS stream ...")
stream, err := client.StartStream()
if err != nil {
return errors.Wrap(err, "failed to start an xDS stream")
}
defer func() {
nodeLog.Info("closing an xDS stream ...")
if err := stream.Close(); err != nil {
errs = multierr.Append(errs, errors.Wrapf(err, "failed to close an xDS stream"))
}
}()
nodeLog.Info("requesting Listeners")
e := stream.Request(id, envoy_resource.ListenerType, dp)
if e != nil {
return errors.Wrapf(e, "failed to request %q", envoy_resource.ListenerType)
}
nodeLog.Info("requesting Clusters")
e = stream.Request(id, envoy_resource.ClusterType, dp)
if e != nil {
return errors.Wrapf(e, "failed to request %q", envoy_resource.ClusterType)
}
nodeLog.Info("requesting Endpoints")
e = stream.Request(id, envoy_resource.EndpointType, dp)
if e != nil {
return errors.Wrapf(e, "failed to request %q", envoy_resource.EndpointType)
}
for {
nodeLog.Info("waiting for a discovery response ...")
resp, err := stream.WaitForResources()
if err != nil {
return errors.Wrap(err, "failed to receive a discovery response")
}
nodeLog.Info("received xDS resources", "type", resp.TypeUrl, "version", resp.VersionInfo, "nonce", resp.Nonce, "resources", len(resp.Resources))
if err := stream.ACK(resp.TypeUrl); err != nil {
return errors.Wrap(err, "failed to ACK a discovery response")
}
nodeLog.Info("ACKed discovery response", "type", resp.TypeUrl, "version", resp.VersionInfo, "nonce", resp.Nonce)
}
}()
}(i)
}
err := <-errCh
return errors.Wrap(err, "one of xDS clients (Envoy simulators) terminated with an error")
},
}
// flags
cmd.PersistentFlags().StringVar(&args.xdsServerAddress, "xds-server-address", args.xdsServerAddress, "address of xDS server")
cmd.PersistentFlags().IntVar(&args.dps, "dps", args.dps, "number of dataplanes to emulate")
cmd.PersistentFlags().IntVar(&args.services, "services", args.services, "number of services")
cmd.PersistentFlags().IntVar(&args.inbounds, "inbounds", args.inbounds, "number of inbounds")
cmd.PersistentFlags().IntVar(&args.outbounds, "outbounds", args.outbounds, "number of outbounds")
cmd.PersistentFlags().DurationVar(&args.rampUpPeriod, "rampup-period", args.rampUpPeriod, "ramp up period")
return cmd
}
func main() {
if err := newRootCmd().Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}