Merge pull request #1 from crystaldust/feature/istiopilotv2-discovery
Feature:istiopilotv2 discovery
diff --git a/.gitignore b/.gitignore
index 6a5366d..e50b616 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,4 +28,4 @@
_build
coverage.txt
-go.sum
\ No newline at end of file
+go.sum
diff --git a/.travis.yml b/.travis.yml
index bf63989..4f90ccd 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -8,6 +8,7 @@
- mkdir -p $HOME/gopath/src/github.com/go-mesh/mesher
- rsync -az ${TRAVIS_BUILD_DIR}/ $HOME/gopath/src/github.com/go-mesh/mesher
- export TRAVIS_BUILD_DIR=$HOME/gopath/src/github.com/go-mesh/mesher
+ - export KUBE_CONFIG=$HOME/gopath/src/github.com/go-mesh/mesher/vendor/github.com/go-mesh/mesher-tools/test/util/sample_kubeconfig
- cd $HOME/gopath/src/github.com/go-mesh/mesher
jobs:
include:
diff --git a/go.mod b/go.mod
index 46092d8..bbadbc6 100644
--- a/go.mod
+++ b/go.mod
@@ -1,27 +1,79 @@
module github.com/go-mesh/mesher
-replace (
- golang.org/x/crypto v0.0.0-20180820150726-614d502a4dac => github.com/golang/crypto v0.0.0-20180820150726-614d502a4dac
- golang.org/x/net v0.0.0-20180824152047-4bcd98cce591 => github.com/golang/net v0.0.0-20180824152047-4bcd98cce591
- golang.org/x/sys v0.0.0-20180824143301-4910a1d54f87 => github.com/golang/sys v0.0.0-20180824143301-4910a1d54f87
- golang.org/x/text v0.3.0 => github.com/golang/text v0.3.0
- golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 => github.com/golang/time v0.0.0-20180412165947-fbb02b2291d2
- google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 => github.com/google/go-genproto v0.0.0-20180817151627-c66870c02cf8
- google.golang.org/grpc v1.14.0 => github.com/grpc/grpc-go v1.14.0
-)
-
require (
+ cloud.google.com/go v0.28.0 // indirect
+ code.cloudfoundry.org/copilot v0.0.0-20180928002835-76734bdb9045 // indirect
+ fortio.org/fortio v1.3.0 // indirect
github.com/emicklei/go-restful-swagger12 v0.0.0-20170926063155-7524189396c6 // indirect
+ github.com/envoyproxy/go-control-plane v0.6.0
+
github.com/go-chassis/go-cc-client v0.0.0-20180831085349-c2bb6cef1640
github.com/go-chassis/go-chassis v0.8.3-0.20180914033538-0791a5cec8b4
github.com/go-chassis/gohessian v0.0.0-20180702061429-e5130c25af55
+ github.com/go-mesh/mesher-tools v0.0.0-20181006103649-cdc091b78a72
github.com/go-mesh/openlogging v0.0.0-20180831021158-f5d1c4e7e506
github.com/gogo/protobuf v1.1.1
+ github.com/gogo/status v1.0.3 // indirect
+ github.com/golang/groupcache v0.0.0-20180924190550-6f2cf27854a4 // indirect
+ github.com/golang/sync v0.0.0-20180314180146-1d60e4601c6f // indirect
+ github.com/google/go-github v17.0.0+incompatible // indirect
+ github.com/google/go-querystring v1.0.0 // indirect
+ github.com/google/uuid v1.0.0 // indirect
+ github.com/gorilla/context v1.1.1 // indirect
+ github.com/gorilla/mux v1.6.2 // indirect
+ github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect
+ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
+ github.com/hashicorp/consul v1.2.3 // indirect
+ github.com/hashicorp/go-cleanhttp v0.5.0 // indirect
+ github.com/hashicorp/go-multierror v1.0.0 // indirect
+ github.com/hashicorp/go-rootcerts v0.0.0-20160503143440-6bb64b370b90 // indirect
+ github.com/hashicorp/serf v0.8.1 // indirect
+ github.com/howeyc/fsnotify v0.9.0 // indirect
+ github.com/inconshreveable/mousetrap v1.0.0 // indirect
+ github.com/mitchellh/go-homedir v1.0.0 // indirect
+ github.com/mitchellh/mapstructure v1.0.0 // indirect
+ github.com/natefinch/lumberjack v2.0.0+incompatible // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible
+ github.com/pkg/errors v0.8.0 // indirect
github.com/prometheus/client_golang v0.8.0
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
+ github.com/prometheus/prom2json v0.0.0-20180620215746-7b8ed2aed129 // indirect
+ github.com/spf13/cobra v0.0.3 // indirect
github.com/stretchr/testify v1.2.2
github.com/urfave/cli v0.0.0-20180821064027-934abfb2f102
+ go.uber.org/atomic v1.3.2 // indirect
+ go.uber.org/multierr v1.1.0 // indirect
+ go.uber.org/zap v1.9.1 // indirect
+ golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be // indirect
+ google.golang.org/appengine v1.2.0 // indirect
google.golang.org/grpc v1.14.0
+ gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19 // indirect
gopkg.in/yaml.v2 v2.2.1
+ istio.io/api v0.0.0-20180926203357-0cf306e2fd19 // indirect
+ istio.io/fortio v1.3.0 // indirect
+ istio.io/istio v0.0.0-20180929031539-a8986e2dc2c3
+ k8s.io/apiextensions-apiserver v0.0.0-20180925155151-ce69c54e57693220512104c84941e2ef1876449a // indirect
+ k8s.io/apimachinery v0.0.0-20180823151430-fda675fbe85280c4550452dae2a5ebf74e4a59b7
+ k8s.io/client-go v8.0.0+incompatible
+ k8s.io/cluster-registry v0.0.6 // indirect
+
+ k8s.io/ingress v0.0.0-20170803151325-fe19ebb09ee2 // indirect
+ k8s.io/kube-openapi v0.0.0-20180928070517-c01ed926f124 // indirect
+)
+
+replace (
+ cloud.google.com/go v0.28.0 => github.com/GoogleCloudPlatform/google-cloud-go v0.28.0
+ github.com/envoyproxy/go-control-plane v0.6.0 => github.com/envoyproxy/go-control-plane v0.0.0-20180918192855-2137d919632883e52e7786f55f0f84e52a44fbf3
+ github.com/kubernetes/client-go => ../k8s.io/client-go
+ golang.org/x/crypto v0.0.0-20180820150726-614d502a4dac => github.com/golang/crypto v0.0.0-20180820150726-614d502a4dac
+ golang.org/x/net v0.0.0-20180724234803-3673e40ba225 => github.com/golang/net v0.0.0-20180724234803-3673e40ba225
+ golang.org/x/net v0.0.0-20180824152047-4bcd98cce591 => github.com/golang/net v0.0.0-20180824152047-4bcd98cce591
+
+ golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be => github.com/golang/oauth2 v0.0.0-20180821212333-d2e6202438be
+ golang.org/x/sys v0.0.0-20180824143301-4910a1d54f87 => github.com/golang/sys v0.0.0-20180824143301-4910a1d54f87
+ golang.org/x/text v0.3.0 => github.com/golang/text v0.3.0
+ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 => github.com/golang/time v0.0.0-20180412165947-fbb02b2291d2
+ google.golang.org/appengine v1.2.0 => github.com/golang/appengine v1.2.0
+ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 => github.com/google/go-genproto v0.0.0-20180817151627-c66870c02cf8
+ google.golang.org/grpc v1.14.0 => github.com/grpc/grpc-go v1.14.0
)
diff --git a/pkg/infras/istio/xds.go b/pkg/infras/istio/xds.go
new file mode 100644
index 0000000..25d95a2
--- /dev/null
+++ b/pkg/infras/istio/xds.go
@@ -0,0 +1,458 @@
+package pilotv2
+
+import (
+ "context"
+ "crypto/tls"
+ "encoding/json"
+ "fmt"
+ "strings"
+
+ apiv2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
+ apiv2core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
+ apiv2endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
+ apiv2route "github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
+ k8sinfra "github.com/go-mesh/mesher/pkg/infras/k8s"
+
+ "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
+ "github.com/go-mesh/openlogging"
+ "github.com/gogo/protobuf/proto"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+ "k8s.io/client-go/rest"
+)
+
+//XdsClient provides the XDS API calls.
+type XdsClient struct {
+ PilotAddr string
+ TlsConfig *tls.Config
+ ReqCaches map[XdsType]*XdsReqCache
+ nodeInfo *NodeInfo
+ NodeID string
+ NodeCluster string
+ k8sClient *rest.RESTClient
+}
+
+//XdsType is the wrapper of string, the wrapper type should be "cds", "eds", "lds" or "rds"
+type XdsType string
+
+const (
+ TypeCds XdsType = "cds"
+ TypeEds XdsType = "eds"
+ TypeLds XdsType = "lds"
+ TypeRds XdsType = "rds"
+)
+
+//XdsReqCache stores the VersionInfo and Nonce for the XDS calls
+type XdsReqCache struct {
+ Nonce string
+ VersionInfo string
+}
+
+//NodeInfo stores the info of the node, which will be used to make a
+//XDS call
+type NodeInfo struct {
+ PodName string
+ Namespace string
+ InstanceIP string
+}
+
+//XdsClusterInfo stores all the infos from a cluster name, which is in
+//the format direction|port|subset|hostName
+type XdsClusterInfo struct {
+ ClusterName string
+ Direction string
+ Port string
+ Subset string
+ HostName string
+ ServiceName string
+ Namespace string
+ DomainSuffix string // DomainSuffix might not be used
+ Tags map[string]string
+ Addrs []string // The accessible addresses of the endpoints
+}
+
+//NewXdsClient returns the new XDS client.
+func NewXdsClient(pilotAddr string, tlsConfig *tls.Config, nodeInfo *NodeInfo, kubeconfigPath string) (*XdsClient, error) {
+ // TODO Handle the array
+ xdsClient := &XdsClient{
+ PilotAddr: pilotAddr,
+ nodeInfo: nodeInfo,
+ }
+ xdsClient.NodeID = "sidecar~" + nodeInfo.InstanceIP + "~" + nodeInfo.PodName + "~" + nodeInfo.Namespace
+ xdsClient.NodeCluster = nodeInfo.PodName
+
+ xdsClient.ReqCaches = map[XdsType]*XdsReqCache{
+ TypeCds: {},
+ TypeEds: {},
+ TypeLds: {},
+ TypeRds: {},
+ }
+
+ if k8sClient, err := k8sinfra.CreateK8SRestClient(kubeconfigPath, "apis", "networking.istio.io", "v1alpha3"); err != nil {
+ return nil, err
+ } else {
+ xdsClient.k8sClient = k8sClient
+ }
+
+ return xdsClient, nil
+}
+
+//GetSubsetTags returns the tags of the specified subset.
+func (client *XdsClient) GetSubsetTags(namespace, hostName, subsetName string) (map[string]string, error) {
+ req := client.k8sClient.Get()
+ req.Resource("destinationrules")
+ req.Namespace(namespace)
+
+ result := req.Do()
+ rawBody, err := result.Raw()
+ if err != nil {
+ return nil, err
+ }
+
+ var drResult k8sinfra.DestinationRuleResult
+ if err := json.Unmarshal(rawBody, &drResult); err != nil {
+ return nil, err
+ }
+
+ // Find the subset
+ tags := map[string]string{}
+ for _, dr := range drResult.Items {
+ if dr.Spec.Host == hostName {
+ for _, subset := range dr.Spec.Subsets {
+ if subset.Name == subsetName {
+ for k, v := range subset.Labels {
+ tags[k] = v
+ }
+ break
+ }
+ }
+ break
+ }
+ }
+
+ return tags, nil
+}
+
+func (client *XdsClient) getGrpcConn() (*grpc.ClientConn, error) {
+ var conn *grpc.ClientConn
+ var err error
+ if client.TlsConfig != nil {
+ creds := credentials.NewTLS(client.TlsConfig)
+ conn, err = grpc.Dial(client.PilotAddr, grpc.WithTransportCredentials(creds))
+ } else {
+ conn, err = grpc.Dial(client.PilotAddr, grpc.WithInsecure())
+ }
+
+ return conn, err
+}
+
+func getAdsResClient(client *XdsClient) (v2.AggregatedDiscoveryService_StreamAggregatedResourcesClient, *grpc.ClientConn, error) {
+ conn, err := client.getGrpcConn()
+ if err != nil {
+ return nil, nil, err
+ }
+
+ adsClient := v2.NewAggregatedDiscoveryServiceClient(conn)
+ adsResClient, err := adsClient.StreamAggregatedResources(context.Background())
+ if err != nil {
+ return nil, nil, err
+ }
+
+ return adsResClient, conn, nil
+}
+
+func (client *XdsClient) getRouterClusters(clusterName string) ([]string, error) {
+ virtualHosts, err := client.RDS(clusterName)
+ if err != nil {
+ return nil, err
+ }
+
+ routerClusters := []string{}
+ for _, h := range virtualHosts {
+ for _, r := range h.Routes {
+ routerClusters = append(routerClusters, r.GetRoute().GetCluster())
+ }
+ }
+
+ return routerClusters, nil
+}
+
+func (client *XdsClient) getVersionInfo(resType XdsType) string {
+ return client.ReqCaches[resType].VersionInfo
+}
+func (client *XdsClient) getNonce(resType XdsType) string {
+ return client.ReqCaches[resType].Nonce
+}
+
+func (client *XdsClient) setVersionInfo(resType XdsType, versionInfo string) {
+ client.ReqCaches[resType].VersionInfo = versionInfo
+}
+
+func (client *XdsClient) setNonce(resType XdsType, nonce string) {
+ client.ReqCaches[resType].Nonce = nonce
+}
+
+//CDS s the Clsuter Discovery Service API, which fetches all the clusters from istio pilot
+func (client *XdsClient) CDS() ([]apiv2.Cluster, error) {
+ adsResClient, conn, err := getAdsResClient(client)
+ if err != nil {
+ return nil, err
+ }
+ defer conn.Close()
+
+ req := &apiv2.DiscoveryRequest{
+ TypeUrl: "type.googleapis.com/envoy.api.v2.Cluster",
+ VersionInfo: client.getVersionInfo(TypeCds),
+ ResponseNonce: client.getNonce(TypeCds),
+ }
+ req.Node = &apiv2core.Node{
+ // Sample taken from istio: router~172.30.77.6~istio-egressgateway-84b4d947cd-rqt45.istio-system~istio-system.svc.cluster.local-2
+ // The Node.Id should be in format {nodeType}~{ipAddr}~{serviceId~{domain}, splitted by '~'
+ // The format is required by pilot
+ Id: client.NodeID,
+ Cluster: client.NodeCluster,
+ }
+
+ if err := adsResClient.Send(req); err != nil {
+ return nil, err
+ }
+
+ resp, err := adsResClient.Recv()
+ if err != nil {
+ return nil, err
+ }
+
+ client.setNonce(TypeCds, resp.GetNonce())
+ client.setVersionInfo(TypeCds, resp.GetVersionInfo())
+ resources := resp.GetResources()
+
+ var cluster apiv2.Cluster
+ clusters := []apiv2.Cluster{}
+ for _, res := range resources {
+ if err := proto.Unmarshal(res.GetValue(), &cluster); err != nil {
+ openlogging.GetLogger().Warnf("Failed to unmarshal cluster resource: %s", err.Error())
+ } else {
+ clusters = append(clusters, cluster)
+ }
+ }
+ return clusters, nil
+}
+
+//EDS is the Endpoint Discovery Service API, the API takes the cluster's name and return all its endpoints(which provide address and port)
+func (client *XdsClient) EDS(clusterName string) (*apiv2.ClusterLoadAssignment, error) {
+ adsResClient, conn, err := getAdsResClient(client)
+ if err != nil {
+ return nil, err
+ }
+ defer conn.Close()
+
+ req := &apiv2.DiscoveryRequest{
+ TypeUrl: "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment",
+ VersionInfo: client.getVersionInfo(TypeEds),
+ ResponseNonce: client.getNonce(TypeEds),
+ }
+
+ req.Node = &apiv2core.Node{
+ Id: client.NodeID,
+ Cluster: client.NodeCluster,
+ }
+ req.ResourceNames = []string{clusterName}
+ if err := adsResClient.Send(req); err != nil {
+ return nil, err
+ }
+
+ resp, err := adsResClient.Recv()
+ if err != nil {
+ return nil, err
+ }
+
+ resources := resp.GetResources()
+ client.setNonce(TypeEds, resp.GetNonce())
+ client.setVersionInfo(TypeEds, resp.GetVersionInfo())
+
+ var loadAssignment apiv2.ClusterLoadAssignment
+ var e error
+ // endpoints := []apiv2.ClusterLoadAssignment{}
+
+ for _, res := range resources {
+ if err := proto.Unmarshal(res.GetValue(), &loadAssignment); err != nil {
+ e = err
+ } else {
+ // The cluster's LoadAssignment will always be ONE, with Endpoints as its field
+ break
+ }
+ }
+ return &loadAssignment, e
+}
+
+//GetEndpointsByTags fetches the cluster's endpoints with tags. The tags is usually specified in a DestinationRule.
+func (client *XdsClient) GetEndpointsByTags(serviceName string, tags map[string]string) ([]apiv2endpoint.LbEndpoint, string, error) {
+ clusters, err := client.CDS()
+ if err != nil {
+ return nil, "", err
+ }
+
+ lbendpoints := []apiv2endpoint.LbEndpoint{}
+ clusterName := ""
+ for _, cluster := range clusters {
+ clusterInfo := ParseClusterName(cluster.Name)
+ if clusterInfo == nil || clusterInfo.Subset == "" || clusterInfo.ServiceName != serviceName {
+ continue
+ }
+ // So clusterInfo is not nil and subset is not empty
+ if subsetTags, err := client.GetSubsetTags(clusterInfo.Namespace, clusterInfo.ServiceName, clusterInfo.Subset); err == nil {
+ // filter with tags
+ matched := true
+ for k, v := range tags {
+ if subsetTagValue, exists := subsetTags[k]; exists == false || subsetTagValue != v {
+ matched = false
+ break
+ }
+ }
+
+ if matched { // We got the cluster!
+ clusterName = cluster.Name
+ loadAssignment, err := client.EDS(cluster.Name)
+ if err != nil {
+ return nil, clusterName, err
+ }
+
+ for _, item := range loadAssignment.Endpoints {
+ lbendpoints = append(lbendpoints, item.LbEndpoints...)
+ }
+
+ return lbendpoints, clusterName, nil
+ }
+ }
+ }
+
+ return lbendpoints, clusterName, nil
+}
+
+//RDS is the Router Discovery Service API, it returns the virtual hosts which contains Routes
+func (client *XdsClient) RDS(clusterName string) ([]apiv2route.VirtualHost, error) {
+ clusterInfo := ParseClusterName(clusterName)
+ if clusterInfo == nil {
+ return nil, fmt.Errorf("Invalid clusterName for routers: %s", clusterName)
+ }
+
+ adsResClient, conn, err := getAdsResClient(client)
+ if err != nil {
+ return nil, err
+ }
+ defer conn.Close()
+
+ req := &apiv2.DiscoveryRequest{
+ TypeUrl: "type.googleapis.com/envoy.api.v2.RouteConfiguration",
+ VersionInfo: client.getVersionInfo(TypeRds),
+ ResponseNonce: client.getNonce(TypeRds),
+ }
+
+ req.Node = &apiv2core.Node{
+ Id: client.NodeID,
+ Cluster: client.NodeCluster,
+ }
+ req.ResourceNames = []string{clusterName}
+ if err := adsResClient.Send(req); err != nil {
+ return nil, err
+ }
+
+ resp, err := adsResClient.Recv()
+ if err != nil {
+ return nil, err
+ }
+
+ resources := resp.GetResources()
+ client.setNonce(TypeRds, resp.GetNonce())
+ client.setVersionInfo(TypeRds, resp.GetVersionInfo())
+
+ var route apiv2.RouteConfiguration
+ virtualHosts := []apiv2route.VirtualHost{}
+
+ for _, res := range resources {
+ if err := proto.Unmarshal(res.GetValue(), &route); err != nil {
+ openlogging.GetLogger().Warnf("Failed to unmarshal router resource: ", err.Error())
+ } else {
+ vhosts := route.GetVirtualHosts()
+ for _, vhost := range vhosts {
+ if vhost.Name == clusterInfo.ServiceName+":"+clusterInfo.Port {
+ virtualHosts = append(virtualHosts, vhost)
+ }
+ }
+ }
+ }
+ return virtualHosts, nil
+}
+
+//LDS is the Listener Discovery Service API, which returns all the listerns
+func (client *XdsClient) LDS() ([]apiv2.Listener, error) {
+ adsResClient, conn, err := getAdsResClient(client)
+ if err != nil {
+ return nil, err
+ }
+ defer conn.Close()
+
+ req := &apiv2.DiscoveryRequest{
+ TypeUrl: "type.googleapis.com/envoy.api.v2.Listener",
+ VersionInfo: client.getVersionInfo(TypeLds),
+ ResponseNonce: client.getNonce(TypeLds),
+ }
+
+ req.Node = &apiv2core.Node{
+ Id: client.NodeID,
+ Cluster: client.NodeCluster,
+ }
+ if err := adsResClient.Send(req); err != nil {
+ return nil, err
+ }
+
+ resp, err := adsResClient.Recv()
+ if err != nil {
+ return nil, err
+ }
+
+ resources := resp.GetResources()
+ client.setNonce(TypeLds, resp.GetNonce())
+ client.setVersionInfo(TypeLds, resp.GetVersionInfo())
+
+ var listener apiv2.Listener
+ listeners := []apiv2.Listener{}
+
+ for _, res := range resources {
+ if err := proto.Unmarshal(res.GetValue(), &listener); err != nil {
+ openlogging.GetLogger().Warnf("Failed to unmarshal listener resource: ", err.Error())
+ } else {
+ listeners = append(listeners, listener)
+ }
+ }
+ return listeners, nil
+}
+
+//ParseClusterName parse the cluster's name, which is in the format direction|port|subset|hostName, the 4 items will be parsed into different fields. The hostName item will also be parsed into ServcieName, Namespace etc.
+func ParseClusterName(clusterName string) *XdsClusterInfo {
+ // clusterName format: direction|port|subset|hostName
+ // hostName format: |svc.namespace.svc.cluster.local
+
+ parts := strings.Split(clusterName, "|")
+ if len(parts) != 4 {
+ return nil
+ }
+
+ hostnameParts := strings.Split(parts[3], ".")
+ if len(hostnameParts) < 2 {
+ return nil
+ }
+
+ cluster := &XdsClusterInfo{
+ Direction: parts[0],
+ Port: parts[1],
+ Subset: parts[2],
+ HostName: parts[3],
+ ServiceName: hostnameParts[0],
+ Namespace: hostnameParts[1],
+ DomainSuffix: strings.Join(hostnameParts[2:], "."),
+ ClusterName: clusterName,
+ }
+
+ return cluster
+}
diff --git a/pkg/infras/istio/xds_test.go b/pkg/infras/istio/xds_test.go
new file mode 100644
index 0000000..be3c1e1
--- /dev/null
+++ b/pkg/infras/istio/xds_test.go
@@ -0,0 +1,248 @@
+package pilotv2
+
+import (
+ "os"
+ "os/user"
+ "testing"
+ "time"
+
+ apiv2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
+ "github.com/go-chassis/go-chassis/core/lager"
+ "github.com/go-chassis/go-chassis/pkg/util/iputil"
+ testutil "github.com/go-mesh/mesher-tools/test/util"
+ "istio.io/istio/tests/util"
+)
+
+const (
+ TEST_POD_NAME = "testpod"
+ NAMESPACE_DEFAULT = "default"
+)
+
+var (
+ ValidXdsClient *XdsClient
+ TestClusters []apiv2.Cluster
+)
+var (
+ KubeConfig string
+ ValidPilotAddr string
+ LocalIPAddress string
+ nodeInfo *NodeInfo
+
+ err error
+)
+
+func TestMain(t *testing.T) {
+ lager.Initialize("", "DEBUG", "", "size", true, 1, 10, 7)
+ // Get kube config path and local ip
+ if KUBE_CONFIG := os.Getenv("KUBE_CONFIG"); KUBE_CONFIG != "" {
+ KubeConfig = KUBE_CONFIG
+ } else {
+ usr, err := user.Current()
+ if err != nil {
+ panic("Failed to get current user info: " + err.Error())
+ } else {
+ KubeConfig = usr.HomeDir + "/" + ".kube/config"
+ }
+ }
+
+ if PILOT_ADDR := os.Getenv("PILOT_ADDR"); PILOT_ADDR != "" {
+ ValidPilotAddr = PILOT_ADDR
+ } else {
+ // panic("PILOT_ADDR should be specified to pass the pilot address")
+ testutil.InitLocalPilotTestEnv(t)
+ ValidPilotAddr = util.MockPilotGrpcAddr
+ }
+
+ if INSTANCE_IP := os.Getenv("INSTANCE_IP"); INSTANCE_IP != "" {
+ LocalIPAddress = INSTANCE_IP
+ } else if LocalIPAddress = iputil.GetLocalIP(); LocalIPAddress == "" {
+ panic("Failed to get the local ip address, please check the network environment")
+ }
+
+ nodeInfo = &NodeInfo{
+ PodName: TEST_POD_NAME,
+ Namespace: NAMESPACE_DEFAULT,
+ InstanceIP: LocalIPAddress,
+ }
+}
+
+func TestNewXdsClient(t *testing.T) {
+ client, err := NewXdsClient(ValidPilotAddr, nil, nodeInfo, KubeConfig)
+
+ if err != nil {
+ t.Errorf("Failed to create xds client: %s", err.Error())
+ }
+
+ ValidXdsClient = client
+}
+
+func TestCDS(t *testing.T) {
+ clusters, err := ValidXdsClient.CDS()
+ if err != nil {
+ t.Errorf("Failed to get clusters by CDS: %s", err.Error())
+ }
+
+ t.Logf("Got %d clusters\n", len(clusters))
+ TestClusters = clusters
+}
+
+func TestEDS(t *testing.T) {
+ if len(TestClusters) == 0 { // With istio, there should always be clusters
+ t.Errorf("No clusters found")
+ }
+
+ loadAssignment, err := ValidXdsClient.EDS(TestClusters[0].Name)
+ if err != nil {
+ t.Errorf("Failed to get endpoints by EDS: %s", err.Error())
+ }
+
+ if loadAssignment == nil {
+ t.Errorf("Failed to get load assginment with EDS: %s", err.Error())
+ }
+}
+
+func TestRDS(t *testing.T) {
+ targetClusterName := ""
+ for _, c := range TestClusters {
+ info := ParseClusterName(c.Name)
+ if info != nil {
+ targetClusterName = c.Name
+ break
+ }
+ }
+
+ if targetClusterName == "" {
+ t.Log("We don't find a valid cluster")
+ }
+
+ _, err := ValidXdsClient.RDS(targetClusterName)
+ if err != nil {
+ t.Errorf("Failed to get routers: %s", err.Error())
+ }
+}
+
+func TestLDS(t *testing.T) {
+ listeners, err := ValidXdsClient.LDS()
+ if err != nil {
+ t.Errorf("Failed to get listeners with LDS: %s", err.Error())
+ }
+
+ t.Logf("%d listeners found\n", len(listeners))
+}
+
+func TestNonce(t *testing.T) {
+ nowStr := time.Now().String()
+ ValidXdsClient.setNonce(TypeCds, nowStr)
+ ValidXdsClient.setNonce(TypeEds, nowStr)
+ ValidXdsClient.setNonce(TypeRds, nowStr)
+ ValidXdsClient.setNonce(TypeLds, nowStr)
+
+ cdsNonce := ValidXdsClient.getNonce(TypeCds)
+ if cdsNonce != nowStr {
+ t.Errorf("Failed to test nonce: %s should be equal to %s", cdsNonce, nowStr)
+ }
+
+ edsNonce := ValidXdsClient.getNonce(TypeEds)
+ if edsNonce != nowStr {
+ t.Errorf("Failed to test nonce: %s should be equal to %s", edsNonce, nowStr)
+ }
+
+ ldsNonce := ValidXdsClient.getNonce(TypeLds)
+ if ldsNonce != nowStr {
+ t.Errorf("Failed to test nonce: %s should be equal to %s", ldsNonce, nowStr)
+ }
+
+ rdsNonce := ValidXdsClient.getNonce(TypeRds)
+ if rdsNonce != nowStr {
+ t.Errorf("Failed to test nonce: %s should be equal to %s", rdsNonce, nowStr)
+ }
+}
+
+func TestVersionInfo(t *testing.T) {
+ nowStr := time.Now().String()
+ ValidXdsClient.setVersionInfo(TypeCds, nowStr)
+ ValidXdsClient.setVersionInfo(TypeEds, nowStr)
+ ValidXdsClient.setVersionInfo(TypeRds, nowStr)
+ ValidXdsClient.setVersionInfo(TypeLds, nowStr)
+
+ cdsVersionInfo := ValidXdsClient.getVersionInfo(TypeCds)
+ if cdsVersionInfo != nowStr {
+ t.Errorf("Failed to test VersionInfo: %s should be equal to %s", cdsVersionInfo, nowStr)
+ }
+
+ edsVersionInfo := ValidXdsClient.getVersionInfo(TypeEds)
+ if edsVersionInfo != nowStr {
+ t.Errorf("Failed to test VersionInfo: %s should be equal to %s", edsVersionInfo, nowStr)
+ }
+
+ ldsVersionInfo := ValidXdsClient.getVersionInfo(TypeLds)
+ if ldsVersionInfo != nowStr {
+ t.Errorf("Failed to test VersionInfo: %s should be equal to %s", ldsVersionInfo, nowStr)
+ }
+
+ rdsVersionInfo := ValidXdsClient.getVersionInfo(TypeRds)
+ if rdsVersionInfo != nowStr {
+ t.Errorf("Failed to test VersionInfo: %s should be equal to %s", rdsVersionInfo, nowStr)
+ }
+}
+
+func TestGetSubsetTags(t *testing.T) {
+ var targetClusterInfo *XdsClusterInfo = nil
+ for _, c := range TestClusters {
+ if info := ParseClusterName(c.Name); info != nil && info.Subset != "" {
+ targetClusterInfo = info
+ break
+ }
+ }
+
+ if targetClusterInfo == nil {
+ t.Log("No tagged services in test environment, skip")
+ } else {
+ tags, err := ValidXdsClient.GetSubsetTags(targetClusterInfo.Namespace, targetClusterInfo.ServiceName, targetClusterInfo.Subset)
+ if err != nil {
+ t.Errorf("Failed to get subset tags: %s", err.Error())
+ } else if len(tags) == 0 {
+ t.Logf("Should not return empty tags %s", targetClusterInfo.ClusterName)
+ }
+ }
+}
+
+func TestGetAdsResClient(t *testing.T) {
+ _, conn, err := getAdsResClient(ValidXdsClient)
+
+ if err != nil {
+ t.Errorf("Failed to get ads resource client: %s", err.Error())
+ }
+ conn.Close()
+}
+
+func TestParseClusterName(t *testing.T) {
+ validClusterName := "inbound|3030||client.default.svc.cluster.local"
+
+ clusterInfo := ParseClusterName(validClusterName)
+
+ if clusterInfo == nil {
+ t.Errorf("Failed to parse cluster name: %s, should return cluster info", validClusterName)
+ }
+ if clusterInfo.Direction != "inbound" {
+ t.Errorf("Failed to parse cluster name: %s, direction should be inbound", validClusterName)
+ }
+ if clusterInfo.Port != "3030" {
+ t.Errorf("Failed to parse cluster name: %s, port should be 3030", validClusterName)
+ }
+ if clusterInfo.ServiceName != "client" {
+ t.Errorf("Failed to parse cluster name: %s, servicename should be client", validClusterName)
+ }
+
+ invalidClusterName := "BlackHoleCluster"
+ clusterInfo = ParseClusterName(invalidClusterName)
+ if clusterInfo != nil {
+ t.Errorf("Failed to parse cluster name: %s, should return nil", validClusterName)
+ }
+
+ invalidClusterName = "outbound|9080|v2|black"
+ clusterInfo = ParseClusterName(invalidClusterName)
+ if clusterInfo != nil {
+ t.Errorf("Failed to parse cluster name: %s, should return nil", validClusterName)
+ }
+}
diff --git a/pkg/infras/k8s/k8s.go b/pkg/infras/k8s/k8s.go
new file mode 100644
index 0000000..998d652
--- /dev/null
+++ b/pkg/infras/k8s/k8s.go
@@ -0,0 +1,60 @@
+package pilotv2
+
+import (
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/apimachinery/pkg/runtime/serializer"
+ "k8s.io/client-go/rest"
+ "k8s.io/client-go/tools/clientcmd"
+)
+
+//DestinationRuleResult is the list of MinDestinationRules
+type DestinationRuleResult struct {
+ Items []MinDestinationRule
+}
+
+// MinDestinationRule is the minimum structure we need to get subsets
+type MinDestinationRule struct {
+ Metadata struct {
+ Name string `json:"name"`
+ Namespace string `json:"namespace"`
+ } `json:"metadata"`
+ Spec struct {
+ Host string `json:"host"`
+ Subsets []struct {
+ Labels map[string]string `json:"labels"`
+ Name string `json:"name"`
+ } `json:"subsets"`
+ } `json:"spec"`
+}
+
+//CreateK8SRestClient returns the kubernetes client for RESTful API calls
+func CreateK8SRestClient(kubeconfig, apiPath, group, version string) (*rest.RESTClient, error) {
+ var config *rest.Config
+ var err error
+ if kubeconfig != "" {
+ config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
+ if err != nil {
+ config, err = rest.InClusterConfig()
+ }
+ } else {
+ config, err = rest.InClusterConfig()
+ }
+
+ if err != nil {
+ return nil, err
+ }
+
+ config.APIPath = apiPath
+ config.GroupVersion = &schema.GroupVersion{
+ Group: group,
+ Version: version,
+ }
+ config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: serializer.NewCodecFactory(runtime.NewScheme())}
+
+ k8sRestClient, err := rest.RESTClientFor(config)
+ if err != nil {
+ return nil, err
+ }
+ return k8sRestClient, nil
+}
diff --git a/pkg/infras/k8s/k8s_test.go b/pkg/infras/k8s/k8s_test.go
new file mode 100644
index 0000000..3a6992c
--- /dev/null
+++ b/pkg/infras/k8s/k8s_test.go
@@ -0,0 +1,42 @@
+package pilotv2
+
+import (
+ "os"
+ "os/user"
+ "testing"
+)
+
+var KubeConfig string
+
+func init() {
+ if KUBE_CONFIG := os.Getenv("KUBE_CONFIG"); KUBE_CONFIG != "" {
+ KubeConfig = KUBE_CONFIG
+ } else {
+ usr, err := user.Current()
+ if err != nil {
+ panic("Failed to get current user info: " + err.Error())
+ } else {
+ KubeConfig = usr.HomeDir + "/" + ".kube/config"
+ }
+ }
+
+}
+
+func TestCreateK8sClient(t *testing.T) {
+ _, err := CreateK8SRestClient(KubeConfig, "apis", "networking.istio.io", "v1alpha3")
+ if err != nil {
+ t.Errorf("Failed to create k8s rest client: %s", err.Error())
+ }
+
+ _, err = CreateK8SRestClient("*nonfile", "apis", "networking.istio.io", "v1alpha3")
+ if err == nil {
+ t.Errorf("Test failed, should return error with invalid kube config path")
+ }
+}
+
+func TestCreateInvalidK8sClient(t *testing.T) {
+ _, err := CreateK8SRestClient("", "apis", "networking.istio.io", "v1alpha3")
+ if err == nil {
+ t.Errorf("Passing a nil config for k8s client should return error")
+ }
+}
diff --git a/plugins/registry/istiov2/cache.go b/plugins/registry/istiov2/cache.go
new file mode 100644
index 0000000..ec05328
--- /dev/null
+++ b/plugins/registry/istiov2/cache.go
@@ -0,0 +1,286 @@
+package istiov2
+
+import (
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ apiv2endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
+ istioinfra "github.com/go-mesh/mesher/pkg/infras/istio"
+
+ "github.com/go-chassis/go-chassis/core/archaius"
+ "github.com/go-chassis/go-chassis/core/common"
+ "github.com/go-chassis/go-chassis/core/config"
+ "github.com/go-chassis/go-chassis/core/lager"
+ "github.com/go-chassis/go-chassis/core/registry"
+)
+
+const (
+ DefaultRefreshInterval = time.Second * 30
+)
+
+var simpleCache *EndpointCache
+
+func init() {
+ simpleCache = &EndpointCache{
+ cache: map[string]EndpointSubset{},
+ }
+}
+
+//CacheManager manages the caches for istio pilot results.
+type CacheManager struct {
+ xdsClient *istioinfra.XdsClient
+}
+
+//AutoSync fetches and updates the cluster and endpoint info periodically
+func (cm *CacheManager) AutoSync() {
+ cm.refreshCache()
+
+ var ticker *time.Ticker
+ refreshInterval := config.GetServiceDiscoveryRefreshInterval()
+ if refreshInterval == "" {
+ ticker = time.NewTicker(DefaultRefreshInterval)
+ } else {
+ timeValue, err := time.ParseDuration(refreshInterval)
+ if err != nil {
+ lager.Logger.Errorf("refeshInterval is invalid. So use Default value: %s", err.Error())
+ timeValue = DefaultRefreshInterval
+ }
+
+ ticker = time.NewTicker(timeValue)
+ }
+ go func() {
+ for range ticker.C {
+ cm.refreshCache()
+ }
+ }()
+}
+
+func (cm *CacheManager) refreshCache() {
+ // TODO What is the design of autodiscovery
+ if archaius.GetBool("cse.service.registry.autodiscovery", false) {
+ lager.Logger.Errorf("SyncPilotEndpoints failed: not supported")
+ }
+
+ err := cm.pullMicroserviceInstance()
+ if err != nil {
+ lager.Logger.Errorf("AutoUpdateMicroserviceInstance failed: %s", err.Error())
+ }
+
+ if archaius.GetBool("cse.service.registry.autoSchemaIndex", false) {
+ lager.Logger.Errorf("MakeSchemaIndex failed: Not support operation")
+ }
+
+ if archaius.GetBool("cse.service.registry.autoIPIndex", false) {
+ err = cm.MakeIPIndex()
+ if err != nil {
+ lager.Logger.Errorf("Auto Update IP index failed: %s", err.Error())
+ }
+ }
+}
+
+func (cm *CacheManager) pullMicroserviceInstance() error {
+ clusterInfos, err := cm.getClusterInfos()
+ if err != nil {
+ return err
+ }
+
+ for _, clusterInfo := range clusterInfos {
+ if clusterInfo.Subset != "" {
+ // Update the cache
+ instances := []*registry.MicroServiceInstance{}
+ for _, addr := range clusterInfo.Addrs {
+ msi := ®istry.MicroServiceInstance{}
+ msi.InstanceID = strings.Replace(addr, ":", "_", 0)
+ msi.HostName = clusterInfo.ClusterName
+ msi.EndpointsMap = map[string]string{
+ common.ProtocolRest: addr,
+ }
+ msi.DefaultEndpoint = addr
+ msi.DefaultProtocol = common.ProtocolRest
+ msi.Metadata = clusterInfo.Tags
+
+ instances = append(instances, msi)
+ }
+
+ endpointSubset := EndpointSubset{
+ tags: clusterInfo.Tags,
+ instances: instances,
+ subsetName: clusterInfo.Subset,
+ }
+ simpleCache.Set(clusterInfo.ClusterName, endpointSubset)
+ }
+ }
+
+ return nil
+}
+
+//MakeIPIndex caches the cluster info with address as the key
+func (cm *CacheManager) MakeIPIndex() error {
+ // TODO Use getClusterInfo to replace the logic
+ clusterInfos, err := cm.getClusterInfos()
+ if err != nil {
+ return err
+ }
+
+ for _, clusterInfo := range clusterInfos {
+ for _, addr := range clusterInfo.Addrs {
+ si := ®istry.SourceInfo{}
+ // TODO Get tags by subset and put them into si.Tags
+ si.Name = clusterInfo.ClusterName
+ si.Tags = clusterInfo.Tags
+ registry.SetIPIndex(addr, si)
+ // TODO Why don't we have to index every endpoint?
+ // break
+ }
+ }
+
+ return nil
+}
+
+//NewCacheManager creates the CacheManager instance.
+func NewCacheManager(xdsClient *istioinfra.XdsClient) (*CacheManager, error) {
+ cacheManager := &CacheManager{
+ xdsClient: xdsClient,
+ }
+
+ return cacheManager, nil
+}
+
+func (cm *CacheManager) getClusterInfos() ([]istioinfra.XdsClusterInfo, error) {
+ clusterInfos := []istioinfra.XdsClusterInfo{}
+
+ clusters, err := cm.xdsClient.CDS()
+ if err != nil {
+ return nil, err
+ }
+
+ for _, cluster := range clusters {
+ // xDS v2 API: CDS won't obtain the cluster's endpoints, call EDS to get the endpoints
+
+ clusterInfo := istioinfra.ParseClusterName(cluster.Name)
+ if clusterInfo == nil {
+ continue
+ }
+
+ // Get Tags
+ if clusterInfo.Subset != "" { // Only clusters with subset contain labels
+ if tags, err := cm.xdsClient.GetSubsetTags(clusterInfo.Namespace, clusterInfo.ServiceName, clusterInfo.Subset); err == nil {
+ clusterInfo.Tags = tags
+ }
+ }
+
+ // Get cluster instances' addresses
+ loadAssignment, err := cm.xdsClient.EDS(cluster.Name)
+ if err != nil {
+ return nil, err
+ }
+ endpoints := loadAssignment.Endpoints
+ for _, endpoint := range endpoints {
+ for _, lbendpoint := range endpoint.LbEndpoints {
+ socketAddress := lbendpoint.Endpoint.Address.GetSocketAddress()
+ addr := socketAddress.GetAddress()
+ port := socketAddress.GetPortValue()
+ ipAddr := addr + ":" + strconv.FormatUint(uint64(port), 10)
+ clusterInfo.Addrs = append(clusterInfo.Addrs, ipAddr)
+ }
+ }
+
+ clusterInfos = append(clusterInfos, *clusterInfo)
+ }
+ return clusterInfos, nil
+}
+
+// TODO Cache with registry index cache
+func updateInstanceIndexCache(lbendpoints []apiv2endpoint.LbEndpoint, clusterName string, tags map[string]string) {
+ if len(lbendpoints) == 0 {
+ simpleCache.Delete(clusterName)
+ return
+ }
+
+ instances := make([]*registry.MicroServiceInstance, 0, len(lbendpoints))
+ for _, lbendpoint := range lbendpoints {
+ msi := toMicroServiceInstance(clusterName, &lbendpoint, tags)
+ instances = append(instances, msi)
+ }
+ subset := EndpointSubset{
+ tags: tags,
+ instances: instances,
+ }
+
+ info := istioinfra.ParseClusterName(clusterName)
+ if info != nil {
+ subset.subsetName = info.Subset
+ }
+
+ simpleCache.Set(clusterName, subset)
+}
+
+//EndpointCache caches the clusters' endpoint and tags
+type EndpointCache struct {
+ mux sync.Mutex
+ cache map[string]EndpointSubset
+}
+
+//EndpointSubset stores the tags and instances of a service
+type EndpointSubset struct {
+ subsetName string
+ tags map[string]string
+ instances []*registry.MicroServiceInstance
+}
+
+//Delete removes the cached instances of the specified cluster
+func (c *EndpointCache) Delete(clusterName string) {
+ c.mux.Lock()
+ delete(c.cache, clusterName)
+ c.mux.Unlock()
+}
+
+//Set updates the cluster's instance info
+func (c *EndpointCache) Set(clusterName string, subset EndpointSubset) {
+ c.mux.Lock()
+ c.cache[clusterName] = subset
+ c.mux.Unlock()
+}
+
+//GetWithTags returns the instances of the service, filtered with tags
+func (c *EndpointCache) GetWithTags(serviceName string, tags map[string]string) []*registry.MicroServiceInstance {
+ // Get subsets whose clusterName matches the service name
+ matchedSubsets := []EndpointSubset{}
+ c.mux.Lock()
+ for clusterName, subset := range c.cache {
+ info := istioinfra.ParseClusterName(clusterName)
+ if info != nil && info.ServiceName == serviceName {
+ matchedSubsets = append(matchedSubsets, subset)
+ }
+ }
+ c.mux.Unlock()
+
+ if len(matchedSubsets) == 0 {
+ return nil
+ }
+
+ var instances []*registry.MicroServiceInstance
+
+ for _, subset := range matchedSubsets {
+ if tagsMatch(subset.tags, tags) {
+ instances = subset.instances
+ break
+ }
+
+ }
+ return instances
+}
+
+// TODO There might be some utils in go-chassis doing the same thing
+func tagsMatch(tags, targetTags map[string]string) bool {
+ matched := true
+ for k, v := range targetTags {
+ if val, exists := tags[k]; !exists || val != v {
+ matched = false
+ break
+ }
+ }
+ return matched
+}
diff --git a/plugins/registry/istiov2/cache_test.go b/plugins/registry/istiov2/cache_test.go
new file mode 100644
index 0000000..6eedd34
--- /dev/null
+++ b/plugins/registry/istiov2/cache_test.go
@@ -0,0 +1,126 @@
+package istiov2
+
+import (
+ "os"
+ "os/user"
+ "testing"
+
+ "github.com/go-chassis/go-chassis/core/lager"
+ "github.com/go-chassis/go-chassis/core/registry"
+ "github.com/go-chassis/go-chassis/pkg/util/iputil"
+ testutil "github.com/go-mesh/mesher-tools/test/util"
+ istioinfra "github.com/go-mesh/mesher/pkg/infras/istio"
+ "istio.io/istio/tests/util"
+)
+
+const (
+ TEST_POD_NAME = "testpod"
+ NAMESPACE_DEFAULT = "default"
+)
+
+var (
+ KubeConfig string
+ ValidPilotAddr string
+ LocalIPAddress string
+ nodeInfo *istioinfra.NodeInfo
+
+ testXdsClient *istioinfra.XdsClient
+ testCacheManager *CacheManager
+ err error
+)
+
+func TestMain(t *testing.T) {
+ lager.Initialize("", "DEBUG", "", "size", true, 1, 10, 7)
+ // Get kube config path and local ip
+ if KUBE_CONFIG := os.Getenv("KUBE_CONFIG"); KUBE_CONFIG != "" {
+ KubeConfig = KUBE_CONFIG
+ } else {
+ usr, err := user.Current()
+ if err != nil {
+ panic("Failed to get current user info: " + err.Error())
+ } else {
+ KubeConfig = usr.HomeDir + "/" + ".kube/config"
+ }
+ }
+
+ if PILOT_ADDR := os.Getenv("PILOT_ADDR"); PILOT_ADDR != "" {
+ ValidPilotAddr = PILOT_ADDR
+ } else {
+ // panic("PILOT_ADDR should be specified to pass the pilot address")
+ testutil.InitLocalPilotTestEnv(t)
+ ValidPilotAddr = util.MockPilotGrpcAddr
+ }
+
+ if INSTANCE_IP := os.Getenv("INSTANCE_IP"); INSTANCE_IP != "" {
+ LocalIPAddress = INSTANCE_IP
+ } else if LocalIPAddress = iputil.GetLocalIP(); LocalIPAddress == "" {
+ panic("Failed to get the local ip address, please check the network environment")
+ }
+
+ nodeInfo = &istioinfra.NodeInfo{
+ PodName: TEST_POD_NAME,
+ Namespace: NAMESPACE_DEFAULT,
+ InstanceIP: LocalIPAddress,
+ }
+
+ testXdsClient, err = istioinfra.NewXdsClient(ValidPilotAddr, nil, nodeInfo, KubeConfig)
+ if err != nil {
+ panic("Failed to prepare test, xds client creation failed: " + err.Error())
+ }
+}
+
+func TestNewCacheManager(t *testing.T) {
+ testCacheManager, err = NewCacheManager(testXdsClient)
+ if err != nil {
+ t.Errorf("Failed to create CacheManager: %s", err.Error())
+ }
+}
+
+// func TestAutoSync(t *testing.T) {
+// testCacheManager.AutoSync()
+// }
+
+func TestPullImcroserviceInstance(t *testing.T) {
+ err = testCacheManager.pullMicroserviceInstance()
+ if err != nil {
+ t.Errorf("Failed to pull microservice instances: %s", err.Error())
+ }
+}
+
+// func TestMakeIPIndex(t *testing.T) {
+// err := testCacheManager.MakeIPIndex()
+// if err != nil {
+// t.Errorf("Failed to make ip index: %s", err.Error())
+// }
+// }
+
+func TestEndpointCache(t *testing.T) {
+ ec := EndpointCache{
+ cache: map[string]EndpointSubset{},
+ }
+
+ subset := EndpointSubset{
+ subsetName: "foo",
+ tags: map[string]string{},
+ instances: []*registry.MicroServiceInstance{},
+ }
+
+ defer func() {
+ if r := recover(); r != nil {
+ t.Error("should not panic")
+ }
+ }()
+
+ waitChannel := make(chan int)
+ for i := 0; i < 1000; i++ {
+ go func() {
+ ec.Set("foo", subset)
+ waitChannel <- 0
+
+ }()
+ }
+
+ for i := 0; i < 1000; i++ {
+ <-waitChannel
+ }
+}
diff --git a/plugins/registry/istiov2/registry.go b/plugins/registry/istiov2/registry.go
new file mode 100644
index 0000000..624dffc
--- /dev/null
+++ b/plugins/registry/istiov2/registry.go
@@ -0,0 +1,243 @@
+package istiov2
+
+import (
+ "fmt"
+ "log"
+ "os"
+ "strconv"
+ "strings"
+
+ apiv2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
+ apiv2endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
+ istioinfra "github.com/go-mesh/mesher/pkg/infras/istio"
+
+ "github.com/go-chassis/go-chassis/core/common"
+ "github.com/go-chassis/go-chassis/core/metadata"
+ "github.com/go-chassis/go-chassis/core/registry"
+ "github.com/go-chassis/go-chassis/pkg/util/iputil"
+ "github.com/go-chassis/go-chassis/pkg/util/tags"
+ "github.com/go-mesh/openlogging"
+)
+
+var (
+ //PodName is the name of the pod that mesher runs in
+ PodName string
+ //PodNamespace is the namespace which the pod belongs to
+ PodNamespace string
+ //InstanceIP is the IP of the pod(the IP of the first network adaptor)
+ InstanceIP string
+)
+
+const (
+ PilotV2Registry = "pilotv2"
+)
+
+//ServiceDiscovery is the discovery service for istio pilot with xDS v2 API
+type ServiceDiscovery struct {
+ Name string
+ client *istioinfra.XdsClient
+ options registry.Options
+}
+
+//GetMicroServiceID returns the id of the micro service
+func (discovery *ServiceDiscovery) GetMicroServiceID(appID, microServiceName, version, env string) (string, error) {
+ return microServiceName, nil
+}
+
+//GetAllMicroServices returns all the micro services, which is mapped from xDS clusters
+func (discovery *ServiceDiscovery) GetAllMicroServices() ([]*registry.MicroService, error) {
+ clusters, err := discovery.client.CDS()
+ if err != nil {
+ return nil, err
+ }
+ microServices := []*registry.MicroService{}
+ for _, cluster := range clusters {
+ microServices = append(microServices, toMicroService(&cluster))
+ }
+ return microServices, nil
+}
+
+func toMicroService(cluster *apiv2.Cluster) *registry.MicroService {
+ svc := ®istry.MicroService{}
+ svc.ServiceID = cluster.Name
+ svc.ServiceName = cluster.Name
+ svc.Version = common.DefaultVersion
+ svc.AppID = common.DefaultApp
+ svc.Level = "BACK"
+ svc.Status = "UP"
+ svc.Framework = ®istry.Framework{
+ Name: "Istio",
+ Version: common.LatestVersion,
+ }
+ svc.RegisterBy = metadata.PlatformRegistrationComponent
+
+ return svc
+}
+
+func toMicroServiceInstance(clusterName string, lbendpoint *apiv2endpoint.LbEndpoint, tags map[string]string) *registry.MicroServiceInstance {
+ socketAddress := lbendpoint.Endpoint.Address.GetSocketAddress()
+ addr := socketAddress.Address
+ port := socketAddress.GetPortValue()
+ portStr := strconv.FormatUint(uint64(port), 10)
+ msi := ®istry.MicroServiceInstance{}
+ msi.InstanceID = addr + "_" + portStr
+ msi.HostName = clusterName
+ msi.DefaultEndpoint = addr + ":" + portStr
+ msi.EndpointsMap = map[string]string{
+ common.ProtocolRest: msi.DefaultEndpoint,
+ }
+ msi.DefaultProtocol = common.ProtocolRest
+ msi.Metadata = tags
+
+ return msi
+}
+
+//GetMicroService returns the micro service info
+func (discovery *ServiceDiscovery) GetMicroService(microServiceID string) (*registry.MicroService, error) {
+ // If the service is in the clusters, return it, or nil
+
+ clusters, err := discovery.client.CDS()
+ if err != nil {
+ return nil, err
+ }
+
+ var targetCluster apiv2.Cluster
+ for _, cluster := range clusters {
+ parts := strings.Split(cluster.Name, "|")
+ if len(parts) < 4 {
+ openlogging.GetLogger().Warnf("Invalid cluster name: %s", cluster.Name)
+ continue
+ }
+
+ svcName := parts[3]
+ if strings.Index(svcName, microServiceID+".") == 0 {
+ targetCluster = cluster
+ break
+ }
+ }
+
+ if &targetCluster == nil {
+ return nil, nil
+ }
+
+ return toMicroService(&targetCluster), nil
+}
+
+//GetMicroServiceInstances returns the instances of the micro service
+func (discovery *ServiceDiscovery) GetMicroServiceInstances(consumerID, providerID string) ([]*registry.MicroServiceInstance, error) {
+ // TODO Handle the registry.MicroserviceIndex cache
+ // TODO Handle the microServiceName
+ service, err := discovery.GetMicroService(providerID)
+ if err != nil {
+ return nil, err
+ }
+
+ loadAssignment, err := discovery.client.EDS(service.ServiceName)
+ if err != nil {
+ return nil, err
+ }
+
+ instances := []*registry.MicroServiceInstance{}
+ endpionts := loadAssignment.Endpoints
+ for _, item := range endpionts {
+ for _, lbendpoint := range item.LbEndpoints {
+ msi := toMicroServiceInstance(loadAssignment.ClusterName, &lbendpoint, nil) // The cluster without subset doesn't have tags
+ instances = append(instances, msi)
+ }
+ }
+
+ return instances, nil
+}
+
+//FindMicroServiceInstances returns the micro service's instances filtered with tags
+func (discovery *ServiceDiscovery) FindMicroServiceInstances(consumerID, microServiceName string, tags utiltags.Tags) ([]*registry.MicroServiceInstance, error) {
+ if tags.KV == nil || tags.Label == "" { // Chassis might pass an empty tags
+ return discovery.GetMicroServiceInstances(consumerID, microServiceName)
+ }
+
+ instances := simpleCache.GetWithTags(microServiceName, tags.KV)
+ if len(instances) == 0 {
+ var lbendpoints []apiv2endpoint.LbEndpoint
+ var err error
+ lbendpoints, clusterName, err := discovery.client.GetEndpointsByTags(microServiceName, tags.KV)
+ if err != nil {
+ return nil, err
+ }
+
+ updateInstanceIndexCache(lbendpoints, clusterName, tags.KV)
+
+ instances = simpleCache.GetWithTags(microServiceName, tags.KV)
+ if instances == nil {
+ return nil, fmt.Errorf("Failed to find microservice instances of %s from cache", microServiceName)
+ }
+ }
+ return instances, nil
+}
+
+var cacheManager *CacheManager
+
+//AutoSync updates the services' info periodically in the background
+func (discovery *ServiceDiscovery) AutoSync() {
+ var err error
+ cacheManager, err = NewCacheManager(discovery.client)
+ if err != nil {
+ openlogging.GetLogger().Errorf("Failed to create cache manager, indexing will not work: %s", err.Error())
+ } else {
+ cacheManager.AutoSync()
+ }
+}
+
+//Close closes the discovery service
+func (discovery *ServiceDiscovery) Close() error {
+ return nil
+}
+
+//NewDiscoveryService creates the new ServiceDiscovery instance
+func NewDiscoveryService(options registry.Options) registry.ServiceDiscovery {
+ if len(options.Addrs) == 0 {
+ panic("Failed to create discovery service: Address not specified")
+ }
+ pilotAddr := options.Addrs[0]
+ nodeInfo := &istioinfra.NodeInfo{
+ PodName: PodName,
+ Namespace: PodNamespace,
+ InstanceIP: InstanceIP,
+ }
+ xdsClient, err := istioinfra.NewXdsClient(pilotAddr, options.TLSConfig, nodeInfo, options.ConfigPath)
+ if err != nil {
+ panic("Failed to create XDS client: " + err.Error())
+ }
+
+ discovery := &ServiceDiscovery{
+ client: xdsClient,
+ Name: PilotV2Registry,
+ options: options,
+ }
+
+ return discovery
+}
+
+func init() {
+ // Init the node info
+ PodName = os.Getenv("POD_NAME")
+ PodNamespace = os.Getenv("POD_NAMESPACE")
+ InstanceIP = os.Getenv("INSTANCE_IP")
+
+ // TODO Handle the default value
+ if PodName == "" {
+ PodName = "pod_name_default"
+ }
+ if PodNamespace == "" {
+ PodNamespace = "default"
+ }
+ if InstanceIP == "" {
+ log.Println("[WARN] Env var INSTANCE_IP not set, try to get instance ip from local network, the service might not work properly.")
+ InstanceIP = iputil.GetLocalIP()
+ if InstanceIP == "" {
+ // Won't work without instance ip
+ panic("Failed to get instance ip")
+ }
+ }
+
+ registry.InstallServiceDiscovery(PilotV2Registry, NewDiscoveryService)
+}
diff --git a/plugins/registry/istiov2/registry_test.go b/plugins/registry/istiov2/registry_test.go
new file mode 100644
index 0000000..6bc0aca
--- /dev/null
+++ b/plugins/registry/istiov2/registry_test.go
@@ -0,0 +1,206 @@
+package istiov2
+
+import (
+ "os"
+ "strconv"
+ "testing"
+
+ apiv2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
+ apiv2core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
+ apiv2endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
+ istioinfra "github.com/go-mesh/mesher/pkg/infras/istio"
+
+ "github.com/go-chassis/go-chassis/core/registry"
+ "github.com/go-chassis/go-chassis/pkg/util/tags"
+)
+
+var VaildServiceDiscovery registry.ServiceDiscovery
+var AllServices []*registry.MicroService
+
+func TestNewDiscoveryService(t *testing.T) {
+ options := registry.Options{
+ Addrs: []string{ValidPilotAddr},
+ ConfigPath: KubeConfig,
+ }
+
+ // Explicitly set the env vars, though this is checkd in the init of cache_test
+ os.Setenv("POD_NAME", TEST_POD_NAME)
+ os.Setenv("NAMESPACE", NAMESPACE_DEFAULT)
+ os.Setenv("INSTANCE_IP", LocalIPAddress)
+
+ // No panic should happen
+ VaildServiceDiscovery = NewDiscoveryService(options)
+
+}
+
+// func TestAutoSync(t *testing.T) {
+// archaius.Init()
+// VaildServiceDiscovery.AutoSync()
+// }
+
+func TestEmptyPilotAddrs(t *testing.T) {
+ defer func() {
+ if err := recover(); err == nil {
+ t.Errorf("Panic should be caught")
+ }
+ }()
+
+ emptyAddrsOptions := registry.Options{
+ Addrs: []string{},
+ ConfigPath: KubeConfig,
+ }
+ NewDiscoveryService(emptyAddrsOptions)
+}
+
+func TestGetAllMicroServices(t *testing.T) {
+ services, err := VaildServiceDiscovery.GetAllMicroServices()
+ if err != nil {
+ t.Errorf("Failed to get all micro services: %s", err.Error())
+ }
+
+ if len(services) == 0 {
+ t.Log("Warn: no micro services found")
+ }
+
+}
+
+func TestGetMicroServiceID(t *testing.T) {
+ serviceName := "pilotv2server"
+ msID, err := VaildServiceDiscovery.GetMicroServiceID("default", serviceName, "v3", "")
+ if err != nil {
+ t.Errorf("Failed to get micro service id: %s", err.Error())
+ }
+
+ if msID != serviceName {
+ t.Errorf("In pilotv2 discovery, msID should be equal to serviceName(%s != %s)", msID, serviceName)
+ }
+}
+
+func TestGetMicroService(t *testing.T) {
+ serviceName := "istio-pilot"
+ svc, err := VaildServiceDiscovery.GetMicroService(serviceName)
+ if err != nil {
+ t.Errorf("Failed to get micro service: %s", err.Error())
+ }
+ if svc == nil {
+ t.Errorf("istio-pilot service should not be nil")
+ }
+}
+
+func TestGetMicroServiceInstance(t *testing.T) {
+ // serviceName := "istio-pilot"
+ serviceName := "hello"
+ instances, err := VaildServiceDiscovery.GetMicroServiceInstances("pilotv2client", serviceName)
+ if err != nil {
+ t.Errorf("Failed to get micro service instances of istio-pilot: %s", err.Error())
+ }
+ if len(instances) == 0 {
+ t.Errorf("istio-pilot's instances should not be empty")
+ }
+}
+
+func TestFindMicroServiceInstances(t *testing.T) {
+ discovery, ok := VaildServiceDiscovery.(*ServiceDiscovery)
+ if !ok {
+ t.Errorf("Failed to convert discovery into type istiov2.ServiceDiscovery")
+ return
+ }
+ client := discovery.client
+
+ clusters, err := client.CDS()
+ if err != nil {
+ t.Errorf("Failed to teset FindMicroServiceInstances, CDS failed: %s", err.Error())
+ }
+
+ var clusterWithSubset *istioinfra.XdsClusterInfo = nil
+ for _, c := range clusters {
+ if info := istioinfra.ParseClusterName(c.Name); info != nil && info.Subset != "" {
+ clusterWithSubset = info
+ }
+ }
+
+ if clusterWithSubset != nil {
+ // an empty tags will make sure target tag always match
+ emptyTags := utiltags.Tags{
+ KV: map[string]string{},
+ Label: "",
+ }
+ instances, err := VaildServiceDiscovery.FindMicroServiceInstances("pilotv2client", clusterWithSubset.ServiceName, emptyTags)
+ if err != nil {
+ t.Errorf("Failed to FindMicroServiceInstances of %s: %s", clusterWithSubset.ServiceName, err.Error())
+ }
+ if len(instances) == 0 {
+ t.Logf("%s's service instances is empty\n", clusterWithSubset.ServiceName)
+ t.Logf("Pls check if the destinationrule and corresponding pod tags are matching")
+ }
+ } else if len(clusters) != 0 {
+ t.Log("No clusters are with subsets")
+ targetCluster := clusters[0]
+
+ tags := utiltags.Tags{
+ KV: map[string]string{
+ "version": "v1",
+ },
+ Label: "version=v1",
+ }
+ _, err := VaildServiceDiscovery.FindMicroServiceInstances("pilotv2client", targetCluster.Name, tags)
+ if err == nil {
+ t.Errorf("Should caught error to get the endpoints of cluster without tags")
+ }
+ }
+
+}
+
+func TestToMicroService(t *testing.T) {
+ cluster := &apiv2.Cluster{
+ Name: "pilotv2server",
+ }
+
+ svc := toMicroService(cluster)
+
+ if svc.ServiceID != cluster.Name {
+ t.Errorf("service id should be equal to cluster name(%s != %s)", svc.ServiceID, cluster.Name)
+ }
+}
+
+func TestToMicroServiceInstance(t *testing.T) {
+ lbendpoint := &apiv2endpoint.LbEndpoint{
+ Endpoint: &apiv2endpoint.Endpoint{
+ Address: &apiv2core.Address{
+ Address: &apiv2core.Address_SocketAddress{
+ SocketAddress: &apiv2core.SocketAddress{
+ Address: "192.168.0.10:8822",
+ },
+ },
+ },
+ },
+ }
+ clusterName := "pilotv2server"
+ tags := map[string]string{
+ "version": "v1",
+ }
+ msi := toMicroServiceInstance(clusterName, lbendpoint, tags)
+
+ socketAddr := lbendpoint.Endpoint.Address.GetSocketAddress()
+ addr := socketAddr.GetAddress()
+ port := socketAddr.GetPortValue()
+
+ if msi.InstanceID != addr+"_"+strconv.FormatUint(uint64(port), 10) {
+ t.Errorf("Invalid msi.InstanceID: %s should be equal to %s_%d", msi.InstanceID, addr, port)
+ }
+
+ if msi.HostName != clusterName {
+ t.Errorf("Invalid msi.HostName: %s should be equal to %s", msi.HostName, clusterName)
+ }
+
+ // Test if the tags match
+ if !tagsMatch(tags, msi.Metadata) {
+ t.Errorf("Tags not match, %v should be subset of %s", tags, msi.Metadata)
+ }
+}
+
+func TestClose(t *testing.T) {
+ if err := VaildServiceDiscovery.Close(); err != nil {
+ t.Error(err)
+ }
+}