Merge pull request #1 from crystaldust/feature/istiopilotv2-discovery

Feature:istiopilotv2 discovery
diff --git a/.gitignore b/.gitignore
index 6a5366d..e50b616 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,4 +28,4 @@
 
 _build
 coverage.txt
-go.sum
\ No newline at end of file
+go.sum
diff --git a/.travis.yml b/.travis.yml
index bf63989..4f90ccd 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -8,6 +8,7 @@
   - mkdir -p $HOME/gopath/src/github.com/go-mesh/mesher
   - rsync -az ${TRAVIS_BUILD_DIR}/ $HOME/gopath/src/github.com/go-mesh/mesher
   - export TRAVIS_BUILD_DIR=$HOME/gopath/src/github.com/go-mesh/mesher
+  - export KUBE_CONFIG=$HOME/gopath/src/github.com/go-mesh/mesher/vendor/github.com/go-mesh/mesher-tools/test/util/sample_kubeconfig
   - cd $HOME/gopath/src/github.com/go-mesh/mesher
 jobs:
   include:
diff --git a/go.mod b/go.mod
index 46092d8..bbadbc6 100644
--- a/go.mod
+++ b/go.mod
@@ -1,27 +1,79 @@
 module github.com/go-mesh/mesher
 
-replace (
-	golang.org/x/crypto v0.0.0-20180820150726-614d502a4dac => github.com/golang/crypto v0.0.0-20180820150726-614d502a4dac
-	golang.org/x/net v0.0.0-20180824152047-4bcd98cce591 => github.com/golang/net v0.0.0-20180824152047-4bcd98cce591
-	golang.org/x/sys v0.0.0-20180824143301-4910a1d54f87 => github.com/golang/sys v0.0.0-20180824143301-4910a1d54f87
-	golang.org/x/text v0.3.0 => github.com/golang/text v0.3.0
-	golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 => github.com/golang/time v0.0.0-20180412165947-fbb02b2291d2
-	google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 => github.com/google/go-genproto v0.0.0-20180817151627-c66870c02cf8
-	google.golang.org/grpc v1.14.0 => github.com/grpc/grpc-go v1.14.0
-)
-
 require (
+	cloud.google.com/go v0.28.0 // indirect
+	code.cloudfoundry.org/copilot v0.0.0-20180928002835-76734bdb9045 // indirect
+	fortio.org/fortio v1.3.0 // indirect
 	github.com/emicklei/go-restful-swagger12 v0.0.0-20170926063155-7524189396c6 // indirect
+	github.com/envoyproxy/go-control-plane v0.6.0
+
 	github.com/go-chassis/go-cc-client v0.0.0-20180831085349-c2bb6cef1640
 	github.com/go-chassis/go-chassis v0.8.3-0.20180914033538-0791a5cec8b4
 	github.com/go-chassis/gohessian v0.0.0-20180702061429-e5130c25af55
+	github.com/go-mesh/mesher-tools v0.0.0-20181006103649-cdc091b78a72
 	github.com/go-mesh/openlogging v0.0.0-20180831021158-f5d1c4e7e506
 	github.com/gogo/protobuf v1.1.1
+	github.com/gogo/status v1.0.3 // indirect
+	github.com/golang/groupcache v0.0.0-20180924190550-6f2cf27854a4 // indirect
+	github.com/golang/sync v0.0.0-20180314180146-1d60e4601c6f // indirect
+	github.com/google/go-github v17.0.0+incompatible // indirect
+	github.com/google/go-querystring v1.0.0 // indirect
+	github.com/google/uuid v1.0.0 // indirect
+	github.com/gorilla/context v1.1.1 // indirect
+	github.com/gorilla/mux v1.6.2 // indirect
+	github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect
+	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
+	github.com/hashicorp/consul v1.2.3 // indirect
+	github.com/hashicorp/go-cleanhttp v0.5.0 // indirect
+	github.com/hashicorp/go-multierror v1.0.0 // indirect
+	github.com/hashicorp/go-rootcerts v0.0.0-20160503143440-6bb64b370b90 // indirect
+	github.com/hashicorp/serf v0.8.1 // indirect
+	github.com/howeyc/fsnotify v0.9.0 // indirect
+	github.com/inconshreveable/mousetrap v1.0.0 // indirect
+	github.com/mitchellh/go-homedir v1.0.0 // indirect
+	github.com/mitchellh/mapstructure v1.0.0 // indirect
+	github.com/natefinch/lumberjack v2.0.0+incompatible // indirect
 	github.com/patrickmn/go-cache v2.1.0+incompatible
+	github.com/pkg/errors v0.8.0 // indirect
 	github.com/prometheus/client_golang v0.8.0
 	github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
+	github.com/prometheus/prom2json v0.0.0-20180620215746-7b8ed2aed129 // indirect
+	github.com/spf13/cobra v0.0.3 // indirect
 	github.com/stretchr/testify v1.2.2
 	github.com/urfave/cli v0.0.0-20180821064027-934abfb2f102
+	go.uber.org/atomic v1.3.2 // indirect
+	go.uber.org/multierr v1.1.0 // indirect
+	go.uber.org/zap v1.9.1 // indirect
+	golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be // indirect
+	google.golang.org/appengine v1.2.0 // indirect
 	google.golang.org/grpc v1.14.0
+	gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19 // indirect
 	gopkg.in/yaml.v2 v2.2.1
+	istio.io/api v0.0.0-20180926203357-0cf306e2fd19 // indirect
+	istio.io/fortio v1.3.0 // indirect
+	istio.io/istio v0.0.0-20180929031539-a8986e2dc2c3
+	k8s.io/apiextensions-apiserver v0.0.0-20180925155151-ce69c54e57693220512104c84941e2ef1876449a // indirect
+	k8s.io/apimachinery v0.0.0-20180823151430-fda675fbe85280c4550452dae2a5ebf74e4a59b7
+	k8s.io/client-go v8.0.0+incompatible
+	k8s.io/cluster-registry v0.0.6 // indirect
+
+	k8s.io/ingress v0.0.0-20170803151325-fe19ebb09ee2 // indirect
+	k8s.io/kube-openapi v0.0.0-20180928070517-c01ed926f124 // indirect
+)
+
+replace (
+	cloud.google.com/go v0.28.0 => github.com/GoogleCloudPlatform/google-cloud-go v0.28.0
+	github.com/envoyproxy/go-control-plane v0.6.0 => github.com/envoyproxy/go-control-plane v0.0.0-20180918192855-2137d919632883e52e7786f55f0f84e52a44fbf3
+	github.com/kubernetes/client-go => ../k8s.io/client-go
+	golang.org/x/crypto v0.0.0-20180820150726-614d502a4dac => github.com/golang/crypto v0.0.0-20180820150726-614d502a4dac
+	golang.org/x/net v0.0.0-20180724234803-3673e40ba225 => github.com/golang/net v0.0.0-20180724234803-3673e40ba225
+	golang.org/x/net v0.0.0-20180824152047-4bcd98cce591 => github.com/golang/net v0.0.0-20180824152047-4bcd98cce591
+
+	golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be => github.com/golang/oauth2 v0.0.0-20180821212333-d2e6202438be
+	golang.org/x/sys v0.0.0-20180824143301-4910a1d54f87 => github.com/golang/sys v0.0.0-20180824143301-4910a1d54f87
+	golang.org/x/text v0.3.0 => github.com/golang/text v0.3.0
+	golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 => github.com/golang/time v0.0.0-20180412165947-fbb02b2291d2
+	google.golang.org/appengine v1.2.0 => github.com/golang/appengine v1.2.0
+	google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 => github.com/google/go-genproto v0.0.0-20180817151627-c66870c02cf8
+	google.golang.org/grpc v1.14.0 => github.com/grpc/grpc-go v1.14.0
 )
diff --git a/pkg/infras/istio/xds.go b/pkg/infras/istio/xds.go
new file mode 100644
index 0000000..25d95a2
--- /dev/null
+++ b/pkg/infras/istio/xds.go
@@ -0,0 +1,458 @@
+package pilotv2
+
+import (
+	"context"
+	"crypto/tls"
+	"encoding/json"
+	"fmt"
+	"strings"
+
+	apiv2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
+	apiv2core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
+	apiv2endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
+	apiv2route "github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
+	k8sinfra "github.com/go-mesh/mesher/pkg/infras/k8s"
+
+	"github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
+	"github.com/go-mesh/openlogging"
+	"github.com/gogo/protobuf/proto"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/credentials"
+	"k8s.io/client-go/rest"
+)
+
+//XdsClient provides the XDS API calls.
+type XdsClient struct {
+	PilotAddr   string
+	TlsConfig   *tls.Config
+	ReqCaches   map[XdsType]*XdsReqCache
+	nodeInfo    *NodeInfo
+	NodeID      string
+	NodeCluster string
+	k8sClient   *rest.RESTClient
+}
+
+//XdsType is the wrapper of string, the wrapper type should be "cds", "eds", "lds" or "rds"
+type XdsType string
+
+const (
+	TypeCds XdsType = "cds"
+	TypeEds XdsType = "eds"
+	TypeLds XdsType = "lds"
+	TypeRds XdsType = "rds"
+)
+
+//XdsReqCache stores the VersionInfo and Nonce for the XDS calls
+type XdsReqCache struct {
+	Nonce       string
+	VersionInfo string
+}
+
+//NodeInfo stores the info of the node, which will be used to make a
+//XDS call
+type NodeInfo struct {
+	PodName    string
+	Namespace  string
+	InstanceIP string
+}
+
+//XdsClusterInfo stores all the infos from a cluster name, which is in
+//the format direction|port|subset|hostName
+type XdsClusterInfo struct {
+	ClusterName  string
+	Direction    string
+	Port         string
+	Subset       string
+	HostName     string
+	ServiceName  string
+	Namespace    string
+	DomainSuffix string // DomainSuffix might not be used
+	Tags         map[string]string
+	Addrs        []string // The accessible addresses of the endpoints
+}
+
+//NewXdsClient returns the new XDS client.
+func NewXdsClient(pilotAddr string, tlsConfig *tls.Config, nodeInfo *NodeInfo, kubeconfigPath string) (*XdsClient, error) {
+	// TODO Handle the array
+	xdsClient := &XdsClient{
+		PilotAddr: pilotAddr,
+		nodeInfo:  nodeInfo,
+	}
+	xdsClient.NodeID = "sidecar~" + nodeInfo.InstanceIP + "~" + nodeInfo.PodName + "~" + nodeInfo.Namespace
+	xdsClient.NodeCluster = nodeInfo.PodName
+
+	xdsClient.ReqCaches = map[XdsType]*XdsReqCache{
+		TypeCds: {},
+		TypeEds: {},
+		TypeLds: {},
+		TypeRds: {},
+	}
+
+	if k8sClient, err := k8sinfra.CreateK8SRestClient(kubeconfigPath, "apis", "networking.istio.io", "v1alpha3"); err != nil {
+		return nil, err
+	} else {
+		xdsClient.k8sClient = k8sClient
+	}
+
+	return xdsClient, nil
+}
+
+//GetSubsetTags returns the tags of the specified subset.
+func (client *XdsClient) GetSubsetTags(namespace, hostName, subsetName string) (map[string]string, error) {
+	req := client.k8sClient.Get()
+	req.Resource("destinationrules")
+	req.Namespace(namespace)
+
+	result := req.Do()
+	rawBody, err := result.Raw()
+	if err != nil {
+		return nil, err
+	}
+
+	var drResult k8sinfra.DestinationRuleResult
+	if err := json.Unmarshal(rawBody, &drResult); err != nil {
+		return nil, err
+	}
+
+	// Find the subset
+	tags := map[string]string{}
+	for _, dr := range drResult.Items {
+		if dr.Spec.Host == hostName {
+			for _, subset := range dr.Spec.Subsets {
+				if subset.Name == subsetName {
+					for k, v := range subset.Labels {
+						tags[k] = v
+					}
+					break
+				}
+			}
+			break
+		}
+	}
+
+	return tags, nil
+}
+
+func (client *XdsClient) getGrpcConn() (*grpc.ClientConn, error) {
+	var conn *grpc.ClientConn
+	var err error
+	if client.TlsConfig != nil {
+		creds := credentials.NewTLS(client.TlsConfig)
+		conn, err = grpc.Dial(client.PilotAddr, grpc.WithTransportCredentials(creds))
+	} else {
+		conn, err = grpc.Dial(client.PilotAddr, grpc.WithInsecure())
+	}
+
+	return conn, err
+}
+
+func getAdsResClient(client *XdsClient) (v2.AggregatedDiscoveryService_StreamAggregatedResourcesClient, *grpc.ClientConn, error) {
+	conn, err := client.getGrpcConn()
+	if err != nil {
+		return nil, nil, err
+	}
+
+	adsClient := v2.NewAggregatedDiscoveryServiceClient(conn)
+	adsResClient, err := adsClient.StreamAggregatedResources(context.Background())
+	if err != nil {
+		return nil, nil, err
+	}
+
+	return adsResClient, conn, nil
+}
+
+func (client *XdsClient) getRouterClusters(clusterName string) ([]string, error) {
+	virtualHosts, err := client.RDS(clusterName)
+	if err != nil {
+		return nil, err
+	}
+
+	routerClusters := []string{}
+	for _, h := range virtualHosts {
+		for _, r := range h.Routes {
+			routerClusters = append(routerClusters, r.GetRoute().GetCluster())
+		}
+	}
+
+	return routerClusters, nil
+}
+
+func (client *XdsClient) getVersionInfo(resType XdsType) string {
+	return client.ReqCaches[resType].VersionInfo
+}
+func (client *XdsClient) getNonce(resType XdsType) string {
+	return client.ReqCaches[resType].Nonce
+}
+
+func (client *XdsClient) setVersionInfo(resType XdsType, versionInfo string) {
+	client.ReqCaches[resType].VersionInfo = versionInfo
+}
+
+func (client *XdsClient) setNonce(resType XdsType, nonce string) {
+	client.ReqCaches[resType].Nonce = nonce
+}
+
+//CDS s the Clsuter Discovery Service API, which fetches all the clusters from istio pilot
+func (client *XdsClient) CDS() ([]apiv2.Cluster, error) {
+	adsResClient, conn, err := getAdsResClient(client)
+	if err != nil {
+		return nil, err
+	}
+	defer conn.Close()
+
+	req := &apiv2.DiscoveryRequest{
+		TypeUrl:       "type.googleapis.com/envoy.api.v2.Cluster",
+		VersionInfo:   client.getVersionInfo(TypeCds),
+		ResponseNonce: client.getNonce(TypeCds),
+	}
+	req.Node = &apiv2core.Node{
+		// Sample taken from istio: router~172.30.77.6~istio-egressgateway-84b4d947cd-rqt45.istio-system~istio-system.svc.cluster.local-2
+		// The Node.Id should be in format {nodeType}~{ipAddr}~{serviceId~{domain}, splitted by '~'
+		// The format is required by pilot
+		Id:      client.NodeID,
+		Cluster: client.NodeCluster,
+	}
+
+	if err := adsResClient.Send(req); err != nil {
+		return nil, err
+	}
+
+	resp, err := adsResClient.Recv()
+	if err != nil {
+		return nil, err
+	}
+
+	client.setNonce(TypeCds, resp.GetNonce())
+	client.setVersionInfo(TypeCds, resp.GetVersionInfo())
+	resources := resp.GetResources()
+
+	var cluster apiv2.Cluster
+	clusters := []apiv2.Cluster{}
+	for _, res := range resources {
+		if err := proto.Unmarshal(res.GetValue(), &cluster); err != nil {
+			openlogging.GetLogger().Warnf("Failed to unmarshal cluster resource: %s", err.Error())
+		} else {
+			clusters = append(clusters, cluster)
+		}
+	}
+	return clusters, nil
+}
+
+//EDS is the Endpoint Discovery Service API, the API takes the cluster's name and return all its endpoints(which provide address and port)
+func (client *XdsClient) EDS(clusterName string) (*apiv2.ClusterLoadAssignment, error) {
+	adsResClient, conn, err := getAdsResClient(client)
+	if err != nil {
+		return nil, err
+	}
+	defer conn.Close()
+
+	req := &apiv2.DiscoveryRequest{
+		TypeUrl:       "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment",
+		VersionInfo:   client.getVersionInfo(TypeEds),
+		ResponseNonce: client.getNonce(TypeEds),
+	}
+
+	req.Node = &apiv2core.Node{
+		Id:      client.NodeID,
+		Cluster: client.NodeCluster,
+	}
+	req.ResourceNames = []string{clusterName}
+	if err := adsResClient.Send(req); err != nil {
+		return nil, err
+	}
+
+	resp, err := adsResClient.Recv()
+	if err != nil {
+		return nil, err
+	}
+
+	resources := resp.GetResources()
+	client.setNonce(TypeEds, resp.GetNonce())
+	client.setVersionInfo(TypeEds, resp.GetVersionInfo())
+
+	var loadAssignment apiv2.ClusterLoadAssignment
+	var e error
+	// endpoints := []apiv2.ClusterLoadAssignment{}
+
+	for _, res := range resources {
+		if err := proto.Unmarshal(res.GetValue(), &loadAssignment); err != nil {
+			e = err
+		} else {
+			// The cluster's LoadAssignment will always be ONE, with Endpoints as its field
+			break
+		}
+	}
+	return &loadAssignment, e
+}
+
+//GetEndpointsByTags fetches the cluster's endpoints with tags. The tags is usually specified in a DestinationRule.
+func (client *XdsClient) GetEndpointsByTags(serviceName string, tags map[string]string) ([]apiv2endpoint.LbEndpoint, string, error) {
+	clusters, err := client.CDS()
+	if err != nil {
+		return nil, "", err
+	}
+
+	lbendpoints := []apiv2endpoint.LbEndpoint{}
+	clusterName := ""
+	for _, cluster := range clusters {
+		clusterInfo := ParseClusterName(cluster.Name)
+		if clusterInfo == nil || clusterInfo.Subset == "" || clusterInfo.ServiceName != serviceName {
+			continue
+		}
+		// So clusterInfo is not nil and subset is not empty
+		if subsetTags, err := client.GetSubsetTags(clusterInfo.Namespace, clusterInfo.ServiceName, clusterInfo.Subset); err == nil {
+			// filter with tags
+			matched := true
+			for k, v := range tags {
+				if subsetTagValue, exists := subsetTags[k]; exists == false || subsetTagValue != v {
+					matched = false
+					break
+				}
+			}
+
+			if matched { // We got the cluster!
+				clusterName = cluster.Name
+				loadAssignment, err := client.EDS(cluster.Name)
+				if err != nil {
+					return nil, clusterName, err
+				}
+
+				for _, item := range loadAssignment.Endpoints {
+					lbendpoints = append(lbendpoints, item.LbEndpoints...)
+				}
+
+				return lbendpoints, clusterName, nil
+			}
+		}
+	}
+
+	return lbendpoints, clusterName, nil
+}
+
+//RDS is the Router Discovery Service API, it returns the virtual hosts which contains Routes
+func (client *XdsClient) RDS(clusterName string) ([]apiv2route.VirtualHost, error) {
+	clusterInfo := ParseClusterName(clusterName)
+	if clusterInfo == nil {
+		return nil, fmt.Errorf("Invalid clusterName for routers: %s", clusterName)
+	}
+
+	adsResClient, conn, err := getAdsResClient(client)
+	if err != nil {
+		return nil, err
+	}
+	defer conn.Close()
+
+	req := &apiv2.DiscoveryRequest{
+		TypeUrl:       "type.googleapis.com/envoy.api.v2.RouteConfiguration",
+		VersionInfo:   client.getVersionInfo(TypeRds),
+		ResponseNonce: client.getNonce(TypeRds),
+	}
+
+	req.Node = &apiv2core.Node{
+		Id:      client.NodeID,
+		Cluster: client.NodeCluster,
+	}
+	req.ResourceNames = []string{clusterName}
+	if err := adsResClient.Send(req); err != nil {
+		return nil, err
+	}
+
+	resp, err := adsResClient.Recv()
+	if err != nil {
+		return nil, err
+	}
+
+	resources := resp.GetResources()
+	client.setNonce(TypeRds, resp.GetNonce())
+	client.setVersionInfo(TypeRds, resp.GetVersionInfo())
+
+	var route apiv2.RouteConfiguration
+	virtualHosts := []apiv2route.VirtualHost{}
+
+	for _, res := range resources {
+		if err := proto.Unmarshal(res.GetValue(), &route); err != nil {
+			openlogging.GetLogger().Warnf("Failed to unmarshal router resource: ", err.Error())
+		} else {
+			vhosts := route.GetVirtualHosts()
+			for _, vhost := range vhosts {
+				if vhost.Name == clusterInfo.ServiceName+":"+clusterInfo.Port {
+					virtualHosts = append(virtualHosts, vhost)
+				}
+			}
+		}
+	}
+	return virtualHosts, nil
+}
+
+//LDS is the Listener Discovery Service API, which returns all the listerns
+func (client *XdsClient) LDS() ([]apiv2.Listener, error) {
+	adsResClient, conn, err := getAdsResClient(client)
+	if err != nil {
+		return nil, err
+	}
+	defer conn.Close()
+
+	req := &apiv2.DiscoveryRequest{
+		TypeUrl:       "type.googleapis.com/envoy.api.v2.Listener",
+		VersionInfo:   client.getVersionInfo(TypeLds),
+		ResponseNonce: client.getNonce(TypeLds),
+	}
+
+	req.Node = &apiv2core.Node{
+		Id:      client.NodeID,
+		Cluster: client.NodeCluster,
+	}
+	if err := adsResClient.Send(req); err != nil {
+		return nil, err
+	}
+
+	resp, err := adsResClient.Recv()
+	if err != nil {
+		return nil, err
+	}
+
+	resources := resp.GetResources()
+	client.setNonce(TypeLds, resp.GetNonce())
+	client.setVersionInfo(TypeLds, resp.GetVersionInfo())
+
+	var listener apiv2.Listener
+	listeners := []apiv2.Listener{}
+
+	for _, res := range resources {
+		if err := proto.Unmarshal(res.GetValue(), &listener); err != nil {
+			openlogging.GetLogger().Warnf("Failed to unmarshal listener resource: ", err.Error())
+		} else {
+			listeners = append(listeners, listener)
+		}
+	}
+	return listeners, nil
+}
+
+//ParseClusterName parse the cluster's name, which is in the format direction|port|subset|hostName, the 4 items will be parsed into different fields. The hostName item will also be parsed into ServcieName, Namespace etc.
+func ParseClusterName(clusterName string) *XdsClusterInfo {
+	// clusterName format: direction|port|subset|hostName
+	// hostName format: |svc.namespace.svc.cluster.local
+
+	parts := strings.Split(clusterName, "|")
+	if len(parts) != 4 {
+		return nil
+	}
+
+	hostnameParts := strings.Split(parts[3], ".")
+	if len(hostnameParts) < 2 {
+		return nil
+	}
+
+	cluster := &XdsClusterInfo{
+		Direction:    parts[0],
+		Port:         parts[1],
+		Subset:       parts[2],
+		HostName:     parts[3],
+		ServiceName:  hostnameParts[0],
+		Namespace:    hostnameParts[1],
+		DomainSuffix: strings.Join(hostnameParts[2:], "."),
+		ClusterName:  clusterName,
+	}
+
+	return cluster
+}
diff --git a/pkg/infras/istio/xds_test.go b/pkg/infras/istio/xds_test.go
new file mode 100644
index 0000000..be3c1e1
--- /dev/null
+++ b/pkg/infras/istio/xds_test.go
@@ -0,0 +1,248 @@
+package pilotv2
+
+import (
+	"os"
+	"os/user"
+	"testing"
+	"time"
+
+	apiv2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
+	"github.com/go-chassis/go-chassis/core/lager"
+	"github.com/go-chassis/go-chassis/pkg/util/iputil"
+	testutil "github.com/go-mesh/mesher-tools/test/util"
+	"istio.io/istio/tests/util"
+)
+
+const (
+	TEST_POD_NAME     = "testpod"
+	NAMESPACE_DEFAULT = "default"
+)
+
+var (
+	ValidXdsClient *XdsClient
+	TestClusters   []apiv2.Cluster
+)
+var (
+	KubeConfig     string
+	ValidPilotAddr string
+	LocalIPAddress string
+	nodeInfo       *NodeInfo
+
+	err error
+)
+
+func TestMain(t *testing.T) {
+	lager.Initialize("", "DEBUG", "", "size", true, 1, 10, 7)
+	// Get kube config path and local ip
+	if KUBE_CONFIG := os.Getenv("KUBE_CONFIG"); KUBE_CONFIG != "" {
+		KubeConfig = KUBE_CONFIG
+	} else {
+		usr, err := user.Current()
+		if err != nil {
+			panic("Failed to get current user info: " + err.Error())
+		} else {
+			KubeConfig = usr.HomeDir + "/" + ".kube/config"
+		}
+	}
+
+	if PILOT_ADDR := os.Getenv("PILOT_ADDR"); PILOT_ADDR != "" {
+		ValidPilotAddr = PILOT_ADDR
+	} else {
+		// panic("PILOT_ADDR should be specified to pass the pilot address")
+		testutil.InitLocalPilotTestEnv(t)
+		ValidPilotAddr = util.MockPilotGrpcAddr
+	}
+
+	if INSTANCE_IP := os.Getenv("INSTANCE_IP"); INSTANCE_IP != "" {
+		LocalIPAddress = INSTANCE_IP
+	} else if LocalIPAddress = iputil.GetLocalIP(); LocalIPAddress == "" {
+		panic("Failed to get the local ip address, please check the network environment")
+	}
+
+	nodeInfo = &NodeInfo{
+		PodName:    TEST_POD_NAME,
+		Namespace:  NAMESPACE_DEFAULT,
+		InstanceIP: LocalIPAddress,
+	}
+}
+
+func TestNewXdsClient(t *testing.T) {
+	client, err := NewXdsClient(ValidPilotAddr, nil, nodeInfo, KubeConfig)
+
+	if err != nil {
+		t.Errorf("Failed to create xds client: %s", err.Error())
+	}
+
+	ValidXdsClient = client
+}
+
+func TestCDS(t *testing.T) {
+	clusters, err := ValidXdsClient.CDS()
+	if err != nil {
+		t.Errorf("Failed to get clusters by CDS: %s", err.Error())
+	}
+
+	t.Logf("Got %d clusters\n", len(clusters))
+	TestClusters = clusters
+}
+
+func TestEDS(t *testing.T) {
+	if len(TestClusters) == 0 { // With istio, there should always be clusters
+		t.Errorf("No clusters found")
+	}
+
+	loadAssignment, err := ValidXdsClient.EDS(TestClusters[0].Name)
+	if err != nil {
+		t.Errorf("Failed to get endpoints by EDS: %s", err.Error())
+	}
+
+	if loadAssignment == nil {
+		t.Errorf("Failed to get load assginment with EDS: %s", err.Error())
+	}
+}
+
+func TestRDS(t *testing.T) {
+	targetClusterName := ""
+	for _, c := range TestClusters {
+		info := ParseClusterName(c.Name)
+		if info != nil {
+			targetClusterName = c.Name
+			break
+		}
+	}
+
+	if targetClusterName == "" {
+		t.Log("We don't find a valid cluster")
+	}
+
+	_, err := ValidXdsClient.RDS(targetClusterName)
+	if err != nil {
+		t.Errorf("Failed to get routers: %s", err.Error())
+	}
+}
+
+func TestLDS(t *testing.T) {
+	listeners, err := ValidXdsClient.LDS()
+	if err != nil {
+		t.Errorf("Failed to get listeners with LDS: %s", err.Error())
+	}
+
+	t.Logf("%d listeners found\n", len(listeners))
+}
+
+func TestNonce(t *testing.T) {
+	nowStr := time.Now().String()
+	ValidXdsClient.setNonce(TypeCds, nowStr)
+	ValidXdsClient.setNonce(TypeEds, nowStr)
+	ValidXdsClient.setNonce(TypeRds, nowStr)
+	ValidXdsClient.setNonce(TypeLds, nowStr)
+
+	cdsNonce := ValidXdsClient.getNonce(TypeCds)
+	if cdsNonce != nowStr {
+		t.Errorf("Failed to test nonce: %s should be equal to %s", cdsNonce, nowStr)
+	}
+
+	edsNonce := ValidXdsClient.getNonce(TypeEds)
+	if edsNonce != nowStr {
+		t.Errorf("Failed to test nonce: %s should be equal to %s", edsNonce, nowStr)
+	}
+
+	ldsNonce := ValidXdsClient.getNonce(TypeLds)
+	if ldsNonce != nowStr {
+		t.Errorf("Failed to test nonce: %s should be equal to %s", ldsNonce, nowStr)
+	}
+
+	rdsNonce := ValidXdsClient.getNonce(TypeRds)
+	if rdsNonce != nowStr {
+		t.Errorf("Failed to test nonce: %s should be equal to %s", rdsNonce, nowStr)
+	}
+}
+
+func TestVersionInfo(t *testing.T) {
+	nowStr := time.Now().String()
+	ValidXdsClient.setVersionInfo(TypeCds, nowStr)
+	ValidXdsClient.setVersionInfo(TypeEds, nowStr)
+	ValidXdsClient.setVersionInfo(TypeRds, nowStr)
+	ValidXdsClient.setVersionInfo(TypeLds, nowStr)
+
+	cdsVersionInfo := ValidXdsClient.getVersionInfo(TypeCds)
+	if cdsVersionInfo != nowStr {
+		t.Errorf("Failed to test VersionInfo: %s should be equal to %s", cdsVersionInfo, nowStr)
+	}
+
+	edsVersionInfo := ValidXdsClient.getVersionInfo(TypeEds)
+	if edsVersionInfo != nowStr {
+		t.Errorf("Failed to test VersionInfo: %s should be equal to %s", edsVersionInfo, nowStr)
+	}
+
+	ldsVersionInfo := ValidXdsClient.getVersionInfo(TypeLds)
+	if ldsVersionInfo != nowStr {
+		t.Errorf("Failed to test VersionInfo: %s should be equal to %s", ldsVersionInfo, nowStr)
+	}
+
+	rdsVersionInfo := ValidXdsClient.getVersionInfo(TypeRds)
+	if rdsVersionInfo != nowStr {
+		t.Errorf("Failed to test VersionInfo: %s should be equal to %s", rdsVersionInfo, nowStr)
+	}
+}
+
+func TestGetSubsetTags(t *testing.T) {
+	var targetClusterInfo *XdsClusterInfo = nil
+	for _, c := range TestClusters {
+		if info := ParseClusterName(c.Name); info != nil && info.Subset != "" {
+			targetClusterInfo = info
+			break
+		}
+	}
+
+	if targetClusterInfo == nil {
+		t.Log("No tagged services in test environment, skip")
+	} else {
+		tags, err := ValidXdsClient.GetSubsetTags(targetClusterInfo.Namespace, targetClusterInfo.ServiceName, targetClusterInfo.Subset)
+		if err != nil {
+			t.Errorf("Failed to get subset tags: %s", err.Error())
+		} else if len(tags) == 0 {
+			t.Logf("Should not return empty tags %s", targetClusterInfo.ClusterName)
+		}
+	}
+}
+
+func TestGetAdsResClient(t *testing.T) {
+	_, conn, err := getAdsResClient(ValidXdsClient)
+
+	if err != nil {
+		t.Errorf("Failed to get ads resource client: %s", err.Error())
+	}
+	conn.Close()
+}
+
+func TestParseClusterName(t *testing.T) {
+	validClusterName := "inbound|3030||client.default.svc.cluster.local"
+
+	clusterInfo := ParseClusterName(validClusterName)
+
+	if clusterInfo == nil {
+		t.Errorf("Failed to parse cluster name: %s, should return cluster info", validClusterName)
+	}
+	if clusterInfo.Direction != "inbound" {
+		t.Errorf("Failed to parse cluster name: %s, direction should be inbound", validClusterName)
+	}
+	if clusterInfo.Port != "3030" {
+		t.Errorf("Failed to parse cluster name: %s, port should be 3030", validClusterName)
+	}
+	if clusterInfo.ServiceName != "client" {
+		t.Errorf("Failed to parse cluster name: %s, servicename should be client", validClusterName)
+	}
+
+	invalidClusterName := "BlackHoleCluster"
+	clusterInfo = ParseClusterName(invalidClusterName)
+	if clusterInfo != nil {
+		t.Errorf("Failed to parse cluster name: %s, should return nil", validClusterName)
+	}
+
+	invalidClusterName = "outbound|9080|v2|black"
+	clusterInfo = ParseClusterName(invalidClusterName)
+	if clusterInfo != nil {
+		t.Errorf("Failed to parse cluster name: %s, should return nil", validClusterName)
+	}
+}
diff --git a/pkg/infras/k8s/k8s.go b/pkg/infras/k8s/k8s.go
new file mode 100644
index 0000000..998d652
--- /dev/null
+++ b/pkg/infras/k8s/k8s.go
@@ -0,0 +1,60 @@
+package pilotv2
+
+import (
+	"k8s.io/apimachinery/pkg/runtime"
+	"k8s.io/apimachinery/pkg/runtime/schema"
+	"k8s.io/apimachinery/pkg/runtime/serializer"
+	"k8s.io/client-go/rest"
+	"k8s.io/client-go/tools/clientcmd"
+)
+
+//DestinationRuleResult is the list of MinDestinationRules
+type DestinationRuleResult struct {
+	Items []MinDestinationRule
+}
+
+// MinDestinationRule is the minimum structure we need to get subsets
+type MinDestinationRule struct {
+	Metadata struct {
+		Name      string `json:"name"`
+		Namespace string `json:"namespace"`
+	} `json:"metadata"`
+	Spec struct {
+		Host    string `json:"host"`
+		Subsets []struct {
+			Labels map[string]string `json:"labels"`
+			Name   string            `json:"name"`
+		} `json:"subsets"`
+	} `json:"spec"`
+}
+
+//CreateK8SRestClient returns the kubernetes client for RESTful API calls
+func CreateK8SRestClient(kubeconfig, apiPath, group, version string) (*rest.RESTClient, error) {
+	var config *rest.Config
+	var err error
+	if kubeconfig != "" {
+		config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
+		if err != nil {
+			config, err = rest.InClusterConfig()
+		}
+	} else {
+		config, err = rest.InClusterConfig()
+	}
+
+	if err != nil {
+		return nil, err
+	}
+
+	config.APIPath = apiPath
+	config.GroupVersion = &schema.GroupVersion{
+		Group:   group,
+		Version: version,
+	}
+	config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: serializer.NewCodecFactory(runtime.NewScheme())}
+
+	k8sRestClient, err := rest.RESTClientFor(config)
+	if err != nil {
+		return nil, err
+	}
+	return k8sRestClient, nil
+}
diff --git a/pkg/infras/k8s/k8s_test.go b/pkg/infras/k8s/k8s_test.go
new file mode 100644
index 0000000..3a6992c
--- /dev/null
+++ b/pkg/infras/k8s/k8s_test.go
@@ -0,0 +1,42 @@
+package pilotv2
+
+import (
+	"os"
+	"os/user"
+	"testing"
+)
+
+var KubeConfig string
+
+func init() {
+	if KUBE_CONFIG := os.Getenv("KUBE_CONFIG"); KUBE_CONFIG != "" {
+		KubeConfig = KUBE_CONFIG
+	} else {
+		usr, err := user.Current()
+		if err != nil {
+			panic("Failed to get current user info: " + err.Error())
+		} else {
+			KubeConfig = usr.HomeDir + "/" + ".kube/config"
+		}
+	}
+
+}
+
+func TestCreateK8sClient(t *testing.T) {
+	_, err := CreateK8SRestClient(KubeConfig, "apis", "networking.istio.io", "v1alpha3")
+	if err != nil {
+		t.Errorf("Failed to create k8s rest client: %s", err.Error())
+	}
+
+	_, err = CreateK8SRestClient("*nonfile", "apis", "networking.istio.io", "v1alpha3")
+	if err == nil {
+		t.Errorf("Test failed, should return error with invalid kube config path")
+	}
+}
+
+func TestCreateInvalidK8sClient(t *testing.T) {
+	_, err := CreateK8SRestClient("", "apis", "networking.istio.io", "v1alpha3")
+	if err == nil {
+		t.Errorf("Passing a nil config for k8s client should return error")
+	}
+}
diff --git a/plugins/registry/istiov2/cache.go b/plugins/registry/istiov2/cache.go
new file mode 100644
index 0000000..ec05328
--- /dev/null
+++ b/plugins/registry/istiov2/cache.go
@@ -0,0 +1,286 @@
+package istiov2
+
+import (
+	"strconv"
+	"strings"
+	"sync"
+	"time"
+
+	apiv2endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
+	istioinfra "github.com/go-mesh/mesher/pkg/infras/istio"
+
+	"github.com/go-chassis/go-chassis/core/archaius"
+	"github.com/go-chassis/go-chassis/core/common"
+	"github.com/go-chassis/go-chassis/core/config"
+	"github.com/go-chassis/go-chassis/core/lager"
+	"github.com/go-chassis/go-chassis/core/registry"
+)
+
+const (
+	DefaultRefreshInterval = time.Second * 30
+)
+
+var simpleCache *EndpointCache
+
+func init() {
+	simpleCache = &EndpointCache{
+		cache: map[string]EndpointSubset{},
+	}
+}
+
+//CacheManager manages the caches for istio pilot results.
+type CacheManager struct {
+	xdsClient *istioinfra.XdsClient
+}
+
+//AutoSync fetches and updates the cluster and endpoint info periodically
+func (cm *CacheManager) AutoSync() {
+	cm.refreshCache()
+
+	var ticker *time.Ticker
+	refreshInterval := config.GetServiceDiscoveryRefreshInterval()
+	if refreshInterval == "" {
+		ticker = time.NewTicker(DefaultRefreshInterval)
+	} else {
+		timeValue, err := time.ParseDuration(refreshInterval)
+		if err != nil {
+			lager.Logger.Errorf("refeshInterval is invalid. So use Default value: %s", err.Error())
+			timeValue = DefaultRefreshInterval
+		}
+
+		ticker = time.NewTicker(timeValue)
+	}
+	go func() {
+		for range ticker.C {
+			cm.refreshCache()
+		}
+	}()
+}
+
+func (cm *CacheManager) refreshCache() {
+	// TODO What is the design of autodiscovery
+	if archaius.GetBool("cse.service.registry.autodiscovery", false) {
+		lager.Logger.Errorf("SyncPilotEndpoints failed: not supported")
+	}
+
+	err := cm.pullMicroserviceInstance()
+	if err != nil {
+		lager.Logger.Errorf("AutoUpdateMicroserviceInstance failed: %s", err.Error())
+	}
+
+	if archaius.GetBool("cse.service.registry.autoSchemaIndex", false) {
+		lager.Logger.Errorf("MakeSchemaIndex failed: Not support operation")
+	}
+
+	if archaius.GetBool("cse.service.registry.autoIPIndex", false) {
+		err = cm.MakeIPIndex()
+		if err != nil {
+			lager.Logger.Errorf("Auto Update IP index failed: %s", err.Error())
+		}
+	}
+}
+
+func (cm *CacheManager) pullMicroserviceInstance() error {
+	clusterInfos, err := cm.getClusterInfos()
+	if err != nil {
+		return err
+	}
+
+	for _, clusterInfo := range clusterInfos {
+		if clusterInfo.Subset != "" {
+			// Update the cache
+			instances := []*registry.MicroServiceInstance{}
+			for _, addr := range clusterInfo.Addrs {
+				msi := &registry.MicroServiceInstance{}
+				msi.InstanceID = strings.Replace(addr, ":", "_", 0)
+				msi.HostName = clusterInfo.ClusterName
+				msi.EndpointsMap = map[string]string{
+					common.ProtocolRest: addr,
+				}
+				msi.DefaultEndpoint = addr
+				msi.DefaultProtocol = common.ProtocolRest
+				msi.Metadata = clusterInfo.Tags
+
+				instances = append(instances, msi)
+			}
+
+			endpointSubset := EndpointSubset{
+				tags:       clusterInfo.Tags,
+				instances:  instances,
+				subsetName: clusterInfo.Subset,
+			}
+			simpleCache.Set(clusterInfo.ClusterName, endpointSubset)
+		}
+	}
+
+	return nil
+}
+
+//MakeIPIndex caches the cluster info with address as the key
+func (cm *CacheManager) MakeIPIndex() error {
+	// TODO Use getClusterInfo to replace the logic
+	clusterInfos, err := cm.getClusterInfos()
+	if err != nil {
+		return err
+	}
+
+	for _, clusterInfo := range clusterInfos {
+		for _, addr := range clusterInfo.Addrs {
+			si := &registry.SourceInfo{}
+			// TODO Get tags by subset and put them into si.Tags
+			si.Name = clusterInfo.ClusterName
+			si.Tags = clusterInfo.Tags
+			registry.SetIPIndex(addr, si)
+			// TODO Why don't we have to index every endpoint?
+			// break
+		}
+	}
+
+	return nil
+}
+
+//NewCacheManager creates the CacheManager instance.
+func NewCacheManager(xdsClient *istioinfra.XdsClient) (*CacheManager, error) {
+	cacheManager := &CacheManager{
+		xdsClient: xdsClient,
+	}
+
+	return cacheManager, nil
+}
+
+func (cm *CacheManager) getClusterInfos() ([]istioinfra.XdsClusterInfo, error) {
+	clusterInfos := []istioinfra.XdsClusterInfo{}
+
+	clusters, err := cm.xdsClient.CDS()
+	if err != nil {
+		return nil, err
+	}
+
+	for _, cluster := range clusters {
+		// xDS v2 API: CDS won't obtain the cluster's endpoints, call EDS to get the endpoints
+
+		clusterInfo := istioinfra.ParseClusterName(cluster.Name)
+		if clusterInfo == nil {
+			continue
+		}
+
+		// Get Tags
+		if clusterInfo.Subset != "" { // Only clusters with subset contain labels
+			if tags, err := cm.xdsClient.GetSubsetTags(clusterInfo.Namespace, clusterInfo.ServiceName, clusterInfo.Subset); err == nil {
+				clusterInfo.Tags = tags
+			}
+		}
+
+		// Get cluster instances' addresses
+		loadAssignment, err := cm.xdsClient.EDS(cluster.Name)
+		if err != nil {
+			return nil, err
+		}
+		endpoints := loadAssignment.Endpoints
+		for _, endpoint := range endpoints {
+			for _, lbendpoint := range endpoint.LbEndpoints {
+				socketAddress := lbendpoint.Endpoint.Address.GetSocketAddress()
+				addr := socketAddress.GetAddress()
+				port := socketAddress.GetPortValue()
+				ipAddr := addr + ":" + strconv.FormatUint(uint64(port), 10)
+				clusterInfo.Addrs = append(clusterInfo.Addrs, ipAddr)
+			}
+		}
+
+		clusterInfos = append(clusterInfos, *clusterInfo)
+	}
+	return clusterInfos, nil
+}
+
+// TODO Cache with registry index cache
+func updateInstanceIndexCache(lbendpoints []apiv2endpoint.LbEndpoint, clusterName string, tags map[string]string) {
+	if len(lbendpoints) == 0 {
+		simpleCache.Delete(clusterName)
+		return
+	}
+
+	instances := make([]*registry.MicroServiceInstance, 0, len(lbendpoints))
+	for _, lbendpoint := range lbendpoints {
+		msi := toMicroServiceInstance(clusterName, &lbendpoint, tags)
+		instances = append(instances, msi)
+	}
+	subset := EndpointSubset{
+		tags:      tags,
+		instances: instances,
+	}
+
+	info := istioinfra.ParseClusterName(clusterName)
+	if info != nil {
+		subset.subsetName = info.Subset
+	}
+
+	simpleCache.Set(clusterName, subset)
+}
+
+//EndpointCache caches the clusters' endpoint and tags
+type EndpointCache struct {
+	mux   sync.Mutex
+	cache map[string]EndpointSubset
+}
+
+//EndpointSubset stores the tags and instances of a service
+type EndpointSubset struct {
+	subsetName string
+	tags       map[string]string
+	instances  []*registry.MicroServiceInstance
+}
+
+//Delete removes the cached instances of the specified cluster
+func (c *EndpointCache) Delete(clusterName string) {
+	c.mux.Lock()
+	delete(c.cache, clusterName)
+	c.mux.Unlock()
+}
+
+//Set updates the cluster's instance info
+func (c *EndpointCache) Set(clusterName string, subset EndpointSubset) {
+	c.mux.Lock()
+	c.cache[clusterName] = subset
+	c.mux.Unlock()
+}
+
+//GetWithTags returns the instances of the service, filtered with tags
+func (c *EndpointCache) GetWithTags(serviceName string, tags map[string]string) []*registry.MicroServiceInstance {
+	// Get subsets whose clusterName matches the service name
+	matchedSubsets := []EndpointSubset{}
+	c.mux.Lock()
+	for clusterName, subset := range c.cache {
+		info := istioinfra.ParseClusterName(clusterName)
+		if info != nil && info.ServiceName == serviceName {
+			matchedSubsets = append(matchedSubsets, subset)
+		}
+	}
+	c.mux.Unlock()
+
+	if len(matchedSubsets) == 0 {
+		return nil
+	}
+
+	var instances []*registry.MicroServiceInstance
+
+	for _, subset := range matchedSubsets {
+		if tagsMatch(subset.tags, tags) {
+			instances = subset.instances
+			break
+		}
+
+	}
+	return instances
+}
+
+// TODO There might be some utils in go-chassis doing the same thing
+func tagsMatch(tags, targetTags map[string]string) bool {
+	matched := true
+	for k, v := range targetTags {
+		if val, exists := tags[k]; !exists || val != v {
+			matched = false
+			break
+		}
+	}
+	return matched
+}
diff --git a/plugins/registry/istiov2/cache_test.go b/plugins/registry/istiov2/cache_test.go
new file mode 100644
index 0000000..6eedd34
--- /dev/null
+++ b/plugins/registry/istiov2/cache_test.go
@@ -0,0 +1,126 @@
+package istiov2
+
+import (
+	"os"
+	"os/user"
+	"testing"
+
+	"github.com/go-chassis/go-chassis/core/lager"
+	"github.com/go-chassis/go-chassis/core/registry"
+	"github.com/go-chassis/go-chassis/pkg/util/iputil"
+	testutil "github.com/go-mesh/mesher-tools/test/util"
+	istioinfra "github.com/go-mesh/mesher/pkg/infras/istio"
+	"istio.io/istio/tests/util"
+)
+
+const (
+	TEST_POD_NAME     = "testpod"
+	NAMESPACE_DEFAULT = "default"
+)
+
+var (
+	KubeConfig     string
+	ValidPilotAddr string
+	LocalIPAddress string
+	nodeInfo       *istioinfra.NodeInfo
+
+	testXdsClient    *istioinfra.XdsClient
+	testCacheManager *CacheManager
+	err              error
+)
+
+func TestMain(t *testing.T) {
+	lager.Initialize("", "DEBUG", "", "size", true, 1, 10, 7)
+	// Get kube config path and local ip
+	if KUBE_CONFIG := os.Getenv("KUBE_CONFIG"); KUBE_CONFIG != "" {
+		KubeConfig = KUBE_CONFIG
+	} else {
+		usr, err := user.Current()
+		if err != nil {
+			panic("Failed to get current user info: " + err.Error())
+		} else {
+			KubeConfig = usr.HomeDir + "/" + ".kube/config"
+		}
+	}
+
+	if PILOT_ADDR := os.Getenv("PILOT_ADDR"); PILOT_ADDR != "" {
+		ValidPilotAddr = PILOT_ADDR
+	} else {
+		// panic("PILOT_ADDR should be specified to pass the pilot address")
+		testutil.InitLocalPilotTestEnv(t)
+		ValidPilotAddr = util.MockPilotGrpcAddr
+	}
+
+	if INSTANCE_IP := os.Getenv("INSTANCE_IP"); INSTANCE_IP != "" {
+		LocalIPAddress = INSTANCE_IP
+	} else if LocalIPAddress = iputil.GetLocalIP(); LocalIPAddress == "" {
+		panic("Failed to get the local ip address, please check the network environment")
+	}
+
+	nodeInfo = &istioinfra.NodeInfo{
+		PodName:    TEST_POD_NAME,
+		Namespace:  NAMESPACE_DEFAULT,
+		InstanceIP: LocalIPAddress,
+	}
+
+	testXdsClient, err = istioinfra.NewXdsClient(ValidPilotAddr, nil, nodeInfo, KubeConfig)
+	if err != nil {
+		panic("Failed to prepare test, xds client creation failed: " + err.Error())
+	}
+}
+
+func TestNewCacheManager(t *testing.T) {
+	testCacheManager, err = NewCacheManager(testXdsClient)
+	if err != nil {
+		t.Errorf("Failed to create CacheManager: %s", err.Error())
+	}
+}
+
+// func TestAutoSync(t *testing.T) {
+//     testCacheManager.AutoSync()
+// }
+
+func TestPullImcroserviceInstance(t *testing.T) {
+	err = testCacheManager.pullMicroserviceInstance()
+	if err != nil {
+		t.Errorf("Failed to pull microservice instances: %s", err.Error())
+	}
+}
+
+// func TestMakeIPIndex(t *testing.T) {
+//     err := testCacheManager.MakeIPIndex()
+//     if err != nil {
+//         t.Errorf("Failed to make ip index: %s", err.Error())
+//     }
+// }
+
+func TestEndpointCache(t *testing.T) {
+	ec := EndpointCache{
+		cache: map[string]EndpointSubset{},
+	}
+
+	subset := EndpointSubset{
+		subsetName: "foo",
+		tags:       map[string]string{},
+		instances:  []*registry.MicroServiceInstance{},
+	}
+
+	defer func() {
+		if r := recover(); r != nil {
+			t.Error("should not panic")
+		}
+	}()
+
+	waitChannel := make(chan int)
+	for i := 0; i < 1000; i++ {
+		go func() {
+			ec.Set("foo", subset)
+			waitChannel <- 0
+
+		}()
+	}
+
+	for i := 0; i < 1000; i++ {
+		<-waitChannel
+	}
+}
diff --git a/plugins/registry/istiov2/registry.go b/plugins/registry/istiov2/registry.go
new file mode 100644
index 0000000..624dffc
--- /dev/null
+++ b/plugins/registry/istiov2/registry.go
@@ -0,0 +1,243 @@
+package istiov2
+
+import (
+	"fmt"
+	"log"
+	"os"
+	"strconv"
+	"strings"
+
+	apiv2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
+	apiv2endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
+	istioinfra "github.com/go-mesh/mesher/pkg/infras/istio"
+
+	"github.com/go-chassis/go-chassis/core/common"
+	"github.com/go-chassis/go-chassis/core/metadata"
+	"github.com/go-chassis/go-chassis/core/registry"
+	"github.com/go-chassis/go-chassis/pkg/util/iputil"
+	"github.com/go-chassis/go-chassis/pkg/util/tags"
+	"github.com/go-mesh/openlogging"
+)
+
+var (
+	//PodName is the name of the pod that mesher runs in
+	PodName string
+	//PodNamespace is the namespace which the pod belongs to
+	PodNamespace string
+	//InstanceIP is the IP of the pod(the IP of the first network adaptor)
+	InstanceIP string
+)
+
+const (
+	PilotV2Registry = "pilotv2"
+)
+
+//ServiceDiscovery is the discovery service for istio pilot with xDS v2 API
+type ServiceDiscovery struct {
+	Name    string
+	client  *istioinfra.XdsClient
+	options registry.Options
+}
+
+//GetMicroServiceID returns the id of the micro service
+func (discovery *ServiceDiscovery) GetMicroServiceID(appID, microServiceName, version, env string) (string, error) {
+	return microServiceName, nil
+}
+
+//GetAllMicroServices returns all the micro services, which is mapped from xDS clusters
+func (discovery *ServiceDiscovery) GetAllMicroServices() ([]*registry.MicroService, error) {
+	clusters, err := discovery.client.CDS()
+	if err != nil {
+		return nil, err
+	}
+	microServices := []*registry.MicroService{}
+	for _, cluster := range clusters {
+		microServices = append(microServices, toMicroService(&cluster))
+	}
+	return microServices, nil
+}
+
+func toMicroService(cluster *apiv2.Cluster) *registry.MicroService {
+	svc := &registry.MicroService{}
+	svc.ServiceID = cluster.Name
+	svc.ServiceName = cluster.Name
+	svc.Version = common.DefaultVersion
+	svc.AppID = common.DefaultApp
+	svc.Level = "BACK"
+	svc.Status = "UP"
+	svc.Framework = &registry.Framework{
+		Name:    "Istio",
+		Version: common.LatestVersion,
+	}
+	svc.RegisterBy = metadata.PlatformRegistrationComponent
+
+	return svc
+}
+
+func toMicroServiceInstance(clusterName string, lbendpoint *apiv2endpoint.LbEndpoint, tags map[string]string) *registry.MicroServiceInstance {
+	socketAddress := lbendpoint.Endpoint.Address.GetSocketAddress()
+	addr := socketAddress.Address
+	port := socketAddress.GetPortValue()
+	portStr := strconv.FormatUint(uint64(port), 10)
+	msi := &registry.MicroServiceInstance{}
+	msi.InstanceID = addr + "_" + portStr
+	msi.HostName = clusterName
+	msi.DefaultEndpoint = addr + ":" + portStr
+	msi.EndpointsMap = map[string]string{
+		common.ProtocolRest: msi.DefaultEndpoint,
+	}
+	msi.DefaultProtocol = common.ProtocolRest
+	msi.Metadata = tags
+
+	return msi
+}
+
+//GetMicroService returns the micro service info
+func (discovery *ServiceDiscovery) GetMicroService(microServiceID string) (*registry.MicroService, error) {
+	// If the service is in the clusters, return it, or nil
+
+	clusters, err := discovery.client.CDS()
+	if err != nil {
+		return nil, err
+	}
+
+	var targetCluster apiv2.Cluster
+	for _, cluster := range clusters {
+		parts := strings.Split(cluster.Name, "|")
+		if len(parts) < 4 {
+			openlogging.GetLogger().Warnf("Invalid cluster name: %s", cluster.Name)
+			continue
+		}
+
+		svcName := parts[3]
+		if strings.Index(svcName, microServiceID+".") == 0 {
+			targetCluster = cluster
+			break
+		}
+	}
+
+	if &targetCluster == nil {
+		return nil, nil
+	}
+
+	return toMicroService(&targetCluster), nil
+}
+
+//GetMicroServiceInstances returns the instances of the micro service
+func (discovery *ServiceDiscovery) GetMicroServiceInstances(consumerID, providerID string) ([]*registry.MicroServiceInstance, error) {
+	// TODO Handle the registry.MicroserviceIndex cache
+	// TODO Handle the microServiceName
+	service, err := discovery.GetMicroService(providerID)
+	if err != nil {
+		return nil, err
+	}
+
+	loadAssignment, err := discovery.client.EDS(service.ServiceName)
+	if err != nil {
+		return nil, err
+	}
+
+	instances := []*registry.MicroServiceInstance{}
+	endpionts := loadAssignment.Endpoints
+	for _, item := range endpionts {
+		for _, lbendpoint := range item.LbEndpoints {
+			msi := toMicroServiceInstance(loadAssignment.ClusterName, &lbendpoint, nil) // The cluster without subset doesn't have tags
+			instances = append(instances, msi)
+		}
+	}
+
+	return instances, nil
+}
+
+//FindMicroServiceInstances returns the micro service's instances filtered with tags
+func (discovery *ServiceDiscovery) FindMicroServiceInstances(consumerID, microServiceName string, tags utiltags.Tags) ([]*registry.MicroServiceInstance, error) {
+	if tags.KV == nil || tags.Label == "" { // Chassis might pass an empty tags
+		return discovery.GetMicroServiceInstances(consumerID, microServiceName)
+	}
+
+	instances := simpleCache.GetWithTags(microServiceName, tags.KV)
+	if len(instances) == 0 {
+		var lbendpoints []apiv2endpoint.LbEndpoint
+		var err error
+		lbendpoints, clusterName, err := discovery.client.GetEndpointsByTags(microServiceName, tags.KV)
+		if err != nil {
+			return nil, err
+		}
+
+		updateInstanceIndexCache(lbendpoints, clusterName, tags.KV)
+
+		instances = simpleCache.GetWithTags(microServiceName, tags.KV)
+		if instances == nil {
+			return nil, fmt.Errorf("Failed to find microservice instances of %s from cache", microServiceName)
+		}
+	}
+	return instances, nil
+}
+
+var cacheManager *CacheManager
+
+//AutoSync updates the services' info periodically in the background
+func (discovery *ServiceDiscovery) AutoSync() {
+	var err error
+	cacheManager, err = NewCacheManager(discovery.client)
+	if err != nil {
+		openlogging.GetLogger().Errorf("Failed to create cache manager, indexing will not work: %s", err.Error())
+	} else {
+		cacheManager.AutoSync()
+	}
+}
+
+//Close closes the discovery service
+func (discovery *ServiceDiscovery) Close() error {
+	return nil
+}
+
+//NewDiscoveryService creates the new ServiceDiscovery instance
+func NewDiscoveryService(options registry.Options) registry.ServiceDiscovery {
+	if len(options.Addrs) == 0 {
+		panic("Failed to create discovery service: Address not specified")
+	}
+	pilotAddr := options.Addrs[0]
+	nodeInfo := &istioinfra.NodeInfo{
+		PodName:    PodName,
+		Namespace:  PodNamespace,
+		InstanceIP: InstanceIP,
+	}
+	xdsClient, err := istioinfra.NewXdsClient(pilotAddr, options.TLSConfig, nodeInfo, options.ConfigPath)
+	if err != nil {
+		panic("Failed to create XDS client: " + err.Error())
+	}
+
+	discovery := &ServiceDiscovery{
+		client:  xdsClient,
+		Name:    PilotV2Registry,
+		options: options,
+	}
+
+	return discovery
+}
+
+func init() {
+	// Init the node info
+	PodName = os.Getenv("POD_NAME")
+	PodNamespace = os.Getenv("POD_NAMESPACE")
+	InstanceIP = os.Getenv("INSTANCE_IP")
+
+	// TODO Handle the default value
+	if PodName == "" {
+		PodName = "pod_name_default"
+	}
+	if PodNamespace == "" {
+		PodNamespace = "default"
+	}
+	if InstanceIP == "" {
+		log.Println("[WARN] Env var INSTANCE_IP not set, try to get instance ip from local network, the service might not work properly.")
+		InstanceIP = iputil.GetLocalIP()
+		if InstanceIP == "" {
+			// Won't work without instance ip
+			panic("Failed to get instance ip")
+		}
+	}
+
+	registry.InstallServiceDiscovery(PilotV2Registry, NewDiscoveryService)
+}
diff --git a/plugins/registry/istiov2/registry_test.go b/plugins/registry/istiov2/registry_test.go
new file mode 100644
index 0000000..6bc0aca
--- /dev/null
+++ b/plugins/registry/istiov2/registry_test.go
@@ -0,0 +1,206 @@
+package istiov2
+
+import (
+	"os"
+	"strconv"
+	"testing"
+
+	apiv2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
+	apiv2core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
+	apiv2endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
+	istioinfra "github.com/go-mesh/mesher/pkg/infras/istio"
+
+	"github.com/go-chassis/go-chassis/core/registry"
+	"github.com/go-chassis/go-chassis/pkg/util/tags"
+)
+
+var VaildServiceDiscovery registry.ServiceDiscovery
+var AllServices []*registry.MicroService
+
+func TestNewDiscoveryService(t *testing.T) {
+	options := registry.Options{
+		Addrs:      []string{ValidPilotAddr},
+		ConfigPath: KubeConfig,
+	}
+
+	// Explicitly set the env vars, though this is checkd in the init of cache_test
+	os.Setenv("POD_NAME", TEST_POD_NAME)
+	os.Setenv("NAMESPACE", NAMESPACE_DEFAULT)
+	os.Setenv("INSTANCE_IP", LocalIPAddress)
+
+	// No panic should happen
+	VaildServiceDiscovery = NewDiscoveryService(options)
+
+}
+
+// func TestAutoSync(t *testing.T) {
+//     archaius.Init()
+//     VaildServiceDiscovery.AutoSync()
+// }
+
+func TestEmptyPilotAddrs(t *testing.T) {
+	defer func() {
+		if err := recover(); err == nil {
+			t.Errorf("Panic should be caught")
+		}
+	}()
+
+	emptyAddrsOptions := registry.Options{
+		Addrs:      []string{},
+		ConfigPath: KubeConfig,
+	}
+	NewDiscoveryService(emptyAddrsOptions)
+}
+
+func TestGetAllMicroServices(t *testing.T) {
+	services, err := VaildServiceDiscovery.GetAllMicroServices()
+	if err != nil {
+		t.Errorf("Failed to get all micro services: %s", err.Error())
+	}
+
+	if len(services) == 0 {
+		t.Log("Warn: no micro services found")
+	}
+
+}
+
+func TestGetMicroServiceID(t *testing.T) {
+	serviceName := "pilotv2server"
+	msID, err := VaildServiceDiscovery.GetMicroServiceID("default", serviceName, "v3", "")
+	if err != nil {
+		t.Errorf("Failed to get micro service id: %s", err.Error())
+	}
+
+	if msID != serviceName {
+		t.Errorf("In pilotv2 discovery, msID should be equal to serviceName(%s != %s)", msID, serviceName)
+	}
+}
+
+func TestGetMicroService(t *testing.T) {
+	serviceName := "istio-pilot"
+	svc, err := VaildServiceDiscovery.GetMicroService(serviceName)
+	if err != nil {
+		t.Errorf("Failed to get micro service: %s", err.Error())
+	}
+	if svc == nil {
+		t.Errorf("istio-pilot service should not be nil")
+	}
+}
+
+func TestGetMicroServiceInstance(t *testing.T) {
+	// serviceName := "istio-pilot"
+	serviceName := "hello"
+	instances, err := VaildServiceDiscovery.GetMicroServiceInstances("pilotv2client", serviceName)
+	if err != nil {
+		t.Errorf("Failed to get micro service instances of istio-pilot: %s", err.Error())
+	}
+	if len(instances) == 0 {
+		t.Errorf("istio-pilot's instances should not be empty")
+	}
+}
+
+func TestFindMicroServiceInstances(t *testing.T) {
+	discovery, ok := VaildServiceDiscovery.(*ServiceDiscovery)
+	if !ok {
+		t.Errorf("Failed to convert discovery into type istiov2.ServiceDiscovery")
+		return
+	}
+	client := discovery.client
+
+	clusters, err := client.CDS()
+	if err != nil {
+		t.Errorf("Failed to teset FindMicroServiceInstances, CDS failed: %s", err.Error())
+	}
+
+	var clusterWithSubset *istioinfra.XdsClusterInfo = nil
+	for _, c := range clusters {
+		if info := istioinfra.ParseClusterName(c.Name); info != nil && info.Subset != "" {
+			clusterWithSubset = info
+		}
+	}
+
+	if clusterWithSubset != nil {
+		// an empty tags will make sure target tag always match
+		emptyTags := utiltags.Tags{
+			KV:    map[string]string{},
+			Label: "",
+		}
+		instances, err := VaildServiceDiscovery.FindMicroServiceInstances("pilotv2client", clusterWithSubset.ServiceName, emptyTags)
+		if err != nil {
+			t.Errorf("Failed to FindMicroServiceInstances of %s: %s", clusterWithSubset.ServiceName, err.Error())
+		}
+		if len(instances) == 0 {
+			t.Logf("%s's service instances is empty\n", clusterWithSubset.ServiceName)
+			t.Logf("Pls check if the destinationrule and corresponding pod tags are matching")
+		}
+	} else if len(clusters) != 0 {
+		t.Log("No clusters are with subsets")
+		targetCluster := clusters[0]
+
+		tags := utiltags.Tags{
+			KV: map[string]string{
+				"version": "v1",
+			},
+			Label: "version=v1",
+		}
+		_, err := VaildServiceDiscovery.FindMicroServiceInstances("pilotv2client", targetCluster.Name, tags)
+		if err == nil {
+			t.Errorf("Should caught error to get the endpoints of cluster without tags")
+		}
+	}
+
+}
+
+func TestToMicroService(t *testing.T) {
+	cluster := &apiv2.Cluster{
+		Name: "pilotv2server",
+	}
+
+	svc := toMicroService(cluster)
+
+	if svc.ServiceID != cluster.Name {
+		t.Errorf("service id should be equal to cluster name(%s != %s)", svc.ServiceID, cluster.Name)
+	}
+}
+
+func TestToMicroServiceInstance(t *testing.T) {
+	lbendpoint := &apiv2endpoint.LbEndpoint{
+		Endpoint: &apiv2endpoint.Endpoint{
+			Address: &apiv2core.Address{
+				Address: &apiv2core.Address_SocketAddress{
+					SocketAddress: &apiv2core.SocketAddress{
+						Address: "192.168.0.10:8822",
+					},
+				},
+			},
+		},
+	}
+	clusterName := "pilotv2server"
+	tags := map[string]string{
+		"version": "v1",
+	}
+	msi := toMicroServiceInstance(clusterName, lbendpoint, tags)
+
+	socketAddr := lbendpoint.Endpoint.Address.GetSocketAddress()
+	addr := socketAddr.GetAddress()
+	port := socketAddr.GetPortValue()
+
+	if msi.InstanceID != addr+"_"+strconv.FormatUint(uint64(port), 10) {
+		t.Errorf("Invalid msi.InstanceID: %s should be equal to %s_%d", msi.InstanceID, addr, port)
+	}
+
+	if msi.HostName != clusterName {
+		t.Errorf("Invalid msi.HostName: %s should be equal to %s", msi.HostName, clusterName)
+	}
+
+	// Test if the tags match
+	if !tagsMatch(tags, msi.Metadata) {
+		t.Errorf("Tags not match, %v should be subset of %s", tags, msi.Metadata)
+	}
+}
+
+func TestClose(t *testing.T) {
+	if err := VaildServiceDiscovery.Close(); err != nil {
+		t.Error(err)
+	}
+}