delete inter cp
diff --git a/app/dubbo-cp/cmd/run.go b/app/dubbo-cp/cmd/run.go
index 9b5c275..71b8389 100644
--- a/app/dubbo-cp/cmd/run.go
+++ b/app/dubbo-cp/cmd/run.go
@@ -41,7 +41,6 @@
 	dp_server "github.com/apache/dubbo-kubernetes/pkg/dp-server"
 	"github.com/apache/dubbo-kubernetes/pkg/dubbo"
 	"github.com/apache/dubbo-kubernetes/pkg/hds"
-	"github.com/apache/dubbo-kubernetes/pkg/intercp"
 	"github.com/apache/dubbo-kubernetes/pkg/test"
 	"github.com/apache/dubbo-kubernetes/pkg/util/os"
 	dubbo_version "github.com/apache/dubbo-kubernetes/pkg/version"
@@ -139,10 +138,6 @@
 				runLog.Error(err, "unable to set up Diagnostics server")
 				return err
 			}
-			if err := intercp.Setup(rt); err != nil {
-				runLog.Error(err, "unable to set up Control Plane Intercommunication")
-				return err
-			}
 
 			if rt.GetMode() == core.Test {
 				if err := test.Setup(rt); err != nil {
diff --git a/pkg/intercp/catalog/catalog.go b/pkg/intercp/catalog/catalog.go
deleted file mode 100644
index 6d91fc7..0000000
--- a/pkg/intercp/catalog/catalog.go
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package catalog
-
-import (
-	"context"
-	"fmt"
-	"net"
-	"strconv"
-)
-
-import (
-	"github.com/pkg/errors"
-)
-
-type Instance struct {
-	Id          string `json:"id"`
-	Address     string `json:"address"`
-	InterCpPort uint16 `json:"interCpPort"`
-	Leader      bool   `json:"leader"`
-}
-
-func (i Instance) InterCpURL() string {
-	return fmt.Sprintf("grpcs://%s", net.JoinHostPort(i.Address, strconv.Itoa(int(i.InterCpPort))))
-}
-
-type Reader interface {
-	Instances(context.Context) ([]Instance, error)
-}
-
-type Catalog interface {
-	Reader
-	Replace(context.Context, []Instance) (bool, error)
-	ReplaceLeader(context.Context, Instance) error
-}
-
-var (
-	ErrNoLeader         = errors.New("leader not found")
-	ErrInstanceNotFound = errors.New("instance not found")
-)
-
-func Leader(ctx context.Context, catalog Catalog) (Instance, error) {
-	instances, err := catalog.Instances(ctx)
-	if err != nil {
-		return Instance{}, err
-	}
-	for _, instance := range instances {
-		if instance.Leader {
-			return instance, nil
-		}
-	}
-	return Instance{}, ErrNoLeader
-}
-
-func InstanceOfID(ctx context.Context, catalog Catalog, id string) (Instance, error) {
-	instances, err := catalog.Instances(ctx)
-	if err != nil {
-		return Instance{}, err
-	}
-	for _, instance := range instances {
-		if instance.Id == id {
-			return instance, nil
-		}
-	}
-	return Instance{}, ErrInstanceNotFound
-}
-
-type InstancesByID []Instance
-
-func (a InstancesByID) Len() int      { return len(a) }
-func (a InstancesByID) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
-func (a InstancesByID) Less(i, j int) bool {
-	return a[i].Id < a[j].Id
-}
diff --git a/pkg/intercp/catalog/config_catalog.go b/pkg/intercp/catalog/config_catalog.go
deleted file mode 100644
index dd5f218..0000000
--- a/pkg/intercp/catalog/config_catalog.go
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package catalog
-
-import (
-	"context"
-	"encoding/json"
-	"sort"
-)
-
-import (
-	system_proto "github.com/apache/dubbo-kubernetes/api/system/v1alpha1"
-	"github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/system"
-	"github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
-	"github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
-	"github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
-)
-
-type ConfigInstances struct {
-	Instances []Instance `json:"instances"`
-}
-
-var CatalogKey = model.ResourceKey{
-	Name: "cp-catalog",
-}
-
-type ConfigCatalog struct {
-	resManager manager.ResourceManager
-	ConfigCatalogReader
-}
-
-var _ Catalog = &ConfigCatalog{}
-
-func NewConfigCatalog(resManager manager.ResourceManager) Catalog {
-	return &ConfigCatalog{
-		resManager: resManager,
-		ConfigCatalogReader: ConfigCatalogReader{
-			resManager: resManager,
-		},
-	}
-}
-
-func (c *ConfigCatalog) Replace(ctx context.Context, instances []Instance) (bool, error) {
-	sort.Stable(InstancesByID(instances))
-	bytes, err := json.Marshal(ConfigInstances{
-		Instances: instances,
-	})
-	if err != nil {
-		return false, nil
-	}
-	newConfig := string(bytes)
-	var updated bool
-	err = manager.Upsert(ctx, c.resManager, CatalogKey, system.NewConfigResource(), func(resource model.Resource) error {
-		if resource.(*system.ConfigResource).Spec.GetConfig() != newConfig {
-			resource.(*system.ConfigResource).Spec = &system_proto.Config{
-				Config: newConfig,
-			}
-			updated = true
-		}
-		return nil
-	})
-	return updated, err
-}
-
-func (c *ConfigCatalog) ReplaceLeader(ctx context.Context, leader Instance) error {
-	return manager.Upsert(ctx, c.resManager, CatalogKey, system.NewConfigResource(), func(resource model.Resource) error {
-		instances := &ConfigInstances{}
-		if cfg := resource.(*system.ConfigResource).Spec.GetConfig(); cfg != "" {
-			if err := json.Unmarshal([]byte(cfg), instances); err != nil {
-				return err
-			}
-		}
-		leaderFound := false
-		for i, instance := range instances.Instances {
-			instance.Leader = false
-			if instance.Id == leader.Id {
-				instance.Leader = true
-				leaderFound = true
-			}
-			instances.Instances[i] = instance
-		}
-		if !leaderFound {
-			instances.Instances = append(instances.Instances, leader)
-			sort.Stable(InstancesByID(instances.Instances))
-		}
-		bytes, err := json.Marshal(instances)
-		if err != nil {
-			return err
-		}
-		resource.(*system.ConfigResource).Spec = &system_proto.Config{
-			Config: string(bytes),
-		}
-		return nil
-	})
-}
-
-type ConfigCatalogReader struct {
-	resManager manager.ReadOnlyResourceManager
-}
-
-var _ Reader = &ConfigCatalogReader{}
-
-func NewConfigCatalogReader(resManager manager.ReadOnlyResourceManager) Reader {
-	return &ConfigCatalogReader{
-		resManager: resManager,
-	}
-}
-
-func (c *ConfigCatalogReader) Instances(ctx context.Context) ([]Instance, error) {
-	cfg := system.NewConfigResource()
-	if err := c.resManager.Get(ctx, cfg, store.GetBy(CatalogKey)); err != nil {
-		if store.IsResourceNotFound(err) {
-			return []Instance{}, nil
-		}
-		return nil, err
-	}
-	var instances ConfigInstances
-	if err := json.Unmarshal([]byte(cfg.Spec.Config), &instances); err != nil {
-		return nil, err
-	}
-	return instances.Instances, nil
-}
diff --git a/pkg/intercp/catalog/heartbeat_component.go b/pkg/intercp/catalog/heartbeat_component.go
deleted file mode 100644
index b36c2f4..0000000
--- a/pkg/intercp/catalog/heartbeat_component.go
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package catalog
-
-import (
-	"context"
-	"time"
-)
-
-import (
-	"github.com/pkg/errors"
-)
-
-import (
-	system_proto "github.com/apache/dubbo-kubernetes/api/system/v1alpha1"
-	"github.com/apache/dubbo-kubernetes/pkg/core"
-	"github.com/apache/dubbo-kubernetes/pkg/core/runtime/component"
-)
-
-var heartbeatLog = core.Log.WithName("intercp").WithName("catalog").WithName("heartbeat")
-
-type heartbeatComponent struct {
-	catalog     Catalog
-	getClientFn GetClientFn
-	request     *system_proto.PingRequest
-	interval    time.Duration
-
-	leader *Instance
-}
-
-var _ component.Component = &heartbeatComponent{}
-
-type GetClientFn = func(url string) (system_proto.InterCpPingServiceClient, error)
-
-func NewHeartbeatComponent(
-	catalog Catalog,
-	instance Instance,
-	interval time.Duration,
-	newClientFn GetClientFn,
-) (component.Component, error) {
-	return &heartbeatComponent{
-		catalog: catalog,
-		request: &system_proto.PingRequest{
-			InstanceId:  instance.Id,
-			Address:     instance.Address,
-			InterCpPort: uint32(instance.InterCpPort),
-		},
-		getClientFn: newClientFn,
-		interval:    interval,
-	}, nil
-}
-
-func (h *heartbeatComponent) Start(stop <-chan struct{}) error {
-	heartbeatLog.Info("starting heartbeats to a leader")
-	ticker := time.NewTicker(h.interval)
-	ctx := context.Background()
-
-	for {
-		select {
-		case <-ticker.C:
-			if !h.heartbeat(ctx, true) {
-				continue
-			}
-		case <-stop:
-			// send final heartbeat to gracefully signal that the instance is going down
-			_ = h.heartbeat(ctx, false)
-			return nil
-		}
-	}
-}
-
-func (h *heartbeatComponent) heartbeat(ctx context.Context, ready bool) bool {
-	heartbeatLog := heartbeatLog.WithValues(
-		"instanceId", h.request.InstanceId,
-		"ready", ready,
-	)
-	if h.leader == nil {
-		if err := h.connectToLeader(ctx); err != nil {
-			heartbeatLog.Error(err, "could not connect to leader")
-			return false
-		}
-	}
-	if h.leader.Id == h.request.InstanceId {
-		heartbeatLog.V(1).Info("this instance is a leader. No need to send a heartbeat.")
-		return true
-	}
-	heartbeatLog = heartbeatLog.WithValues(
-		"leaderAddress", h.leader.Address,
-	)
-	heartbeatLog.V(1).Info("sending a heartbeat to a leader")
-	h.request.Ready = ready
-	client, err := h.getClientFn(h.leader.InterCpURL())
-	if err != nil {
-		heartbeatLog.Error(err, "could not get or create a client to a leader")
-		h.leader = nil
-		return false
-	}
-	resp, err := client.Ping(ctx, h.request)
-	if err != nil {
-		heartbeatLog.Error(err, "could not send a heartbeat to a leader")
-		h.leader = nil
-		return false
-	}
-	if !resp.Leader {
-		heartbeatLog.V(1).Info("instance responded that it is no longer a leader")
-		h.leader = nil
-	}
-	return true
-}
-
-func (h *heartbeatComponent) connectToLeader(ctx context.Context) error {
-	newLeader, err := Leader(ctx, h.catalog)
-	if err != nil {
-		return err
-	}
-	h.leader = &newLeader
-	if h.leader.Id == h.request.InstanceId {
-		return nil
-	}
-	heartbeatLog.Info("leader has changed. Creating connection to the new leader.",
-		"previousLeaderAddress", h.leader.Address,
-		"newLeaderAddress", newLeader.Leader,
-	)
-	_, err = h.getClientFn(h.leader.InterCpURL())
-	if err != nil {
-		return errors.Wrap(err, "could not create a client to a leader")
-	}
-	return nil
-}
-
-func (h *heartbeatComponent) NeedLeaderElection() bool {
-	return false
-}
diff --git a/pkg/intercp/catalog/heartbeats.go b/pkg/intercp/catalog/heartbeats.go
deleted file mode 100644
index df65376..0000000
--- a/pkg/intercp/catalog/heartbeats.go
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package catalog
-
-import (
-	"sync"
-)
-
-type Heartbeats struct {
-	instances map[Instance]struct{}
-	sync.Mutex
-}
-
-func NewHeartbeats() *Heartbeats {
-	return &Heartbeats{
-		instances: map[Instance]struct{}{},
-	}
-}
-
-func (h *Heartbeats) ResetAndCollect() []Instance {
-	h.Lock()
-	currentInstances := h.instances
-	h.instances = map[Instance]struct{}{}
-	h.Unlock()
-	var instances []Instance
-	for k := range currentInstances {
-		instances = append(instances, k)
-	}
-	return instances
-}
-
-func (h *Heartbeats) Add(instance Instance) {
-	h.Lock()
-	h.instances[instance] = struct{}{}
-	h.Unlock()
-}
-
-func (h *Heartbeats) Remove(instance Instance) {
-	h.Lock()
-	delete(h.instances, instance)
-	h.Unlock()
-}
diff --git a/pkg/intercp/catalog/server.go b/pkg/intercp/catalog/server.go
deleted file mode 100644
index db76ddc..0000000
--- a/pkg/intercp/catalog/server.go
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package catalog
-
-import (
-	"context"
-)
-
-import (
-	system_proto "github.com/apache/dubbo-kubernetes/api/system/v1alpha1"
-	"github.com/apache/dubbo-kubernetes/pkg/core"
-	"github.com/apache/dubbo-kubernetes/pkg/core/runtime/component"
-)
-
-var serverLog = core.Log.WithName("intercp").WithName("catalog").WithName("server")
-
-type server struct {
-	heartbeats *Heartbeats
-	leaderInfo component.LeaderInfo
-
-	system_proto.UnimplementedInterCpPingServiceServer
-}
-
-var _ system_proto.InterCpPingServiceServer = &server{}
-
-func NewServer(heartbeats *Heartbeats, leaderInfo component.LeaderInfo) system_proto.InterCpPingServiceServer {
-	return &server{
-		heartbeats: heartbeats,
-		leaderInfo: leaderInfo,
-	}
-}
-
-func (s *server) Ping(_ context.Context, request *system_proto.PingRequest) (*system_proto.PingResponse, error) {
-	serverLog.V(1).Info("received ping", "instanceID", request.InstanceId, "address", request.Address, "ready", request.Ready)
-	instance := Instance{
-		Id:          request.InstanceId,
-		Address:     request.Address,
-		InterCpPort: uint16(request.InterCpPort),
-		Leader:      false,
-	}
-	if request.Ready {
-		s.heartbeats.Add(instance)
-	} else {
-		s.heartbeats.Remove(instance)
-	}
-	return &system_proto.PingResponse{
-		Leader: s.leaderInfo.IsLeader(),
-	}, nil
-}
diff --git a/pkg/intercp/catalog/writer.go b/pkg/intercp/catalog/writer.go
deleted file mode 100644
index 4861b83..0000000
--- a/pkg/intercp/catalog/writer.go
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package catalog
-
-import (
-	"context"
-	"time"
-)
-
-import (
-	"github.com/apache/dubbo-kubernetes/pkg/core"
-	"github.com/apache/dubbo-kubernetes/pkg/core/runtime/component"
-)
-
-var writerLog = core.Log.WithName("intercp").WithName("catalog").WithName("writer")
-
-type catalogWriter struct {
-	catalog    Catalog
-	heartbeats *Heartbeats
-	instance   Instance
-	interval   time.Duration
-}
-
-var _ component.Component = &catalogWriter{}
-
-func NewWriter(
-	catalog Catalog,
-	heartbeats *Heartbeats,
-	instance Instance,
-	interval time.Duration,
-) (component.Component, error) {
-	leaderInstance := instance
-	leaderInstance.Leader = true
-	return &catalogWriter{
-		catalog:    catalog,
-		heartbeats: heartbeats,
-		instance:   leaderInstance,
-		interval:   interval,
-	}, nil
-}
-
-func (r *catalogWriter) Start(stop <-chan struct{}) error {
-	heartbeatLog.Info("starting catalog writer")
-	ctx := context.Background()
-	writerLog.Info("replacing a leader in the catalog")
-	if err := r.catalog.ReplaceLeader(ctx, r.instance); err != nil {
-		writerLog.Error(err, "could not replace leader") // continue, it will be replaced in ticker anyways
-	}
-	ticker := time.NewTicker(r.interval)
-	for {
-		select {
-		case <-ticker.C:
-			instances := r.heartbeats.ResetAndCollect()
-			instances = append(instances, r.instance)
-			updated, err := r.catalog.Replace(ctx, instances)
-			if err != nil {
-				writerLog.Error(err, "could not update catalog")
-				continue
-			}
-			if updated {
-				writerLog.Info("instances catalog updated", "instances", instances)
-			} else {
-				writerLog.V(1).Info("no need to update instances, because the catalog is the same", "instances", instances)
-			}
-		case <-stop:
-			return nil
-		}
-	}
-}
-
-func (r *catalogWriter) NeedLeaderElection() bool {
-	return true
-}
diff --git a/pkg/intercp/client/client.go b/pkg/intercp/client/client.go
deleted file mode 100644
index 7f3cbf5..0000000
--- a/pkg/intercp/client/client.go
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package client
-
-import (
-	"crypto/tls"
-	"crypto/x509"
-	"io"
-	"net/url"
-)
-
-import (
-	"github.com/pkg/errors"
-
-	"google.golang.org/grpc"
-	"google.golang.org/grpc/connectivity"
-	"google.golang.org/grpc/credentials"
-	"google.golang.org/grpc/credentials/insecure"
-)
-
-type TLSConfig struct {
-	CaCert     x509.Certificate
-	ClientCert tls.Certificate
-}
-
-type Conn interface {
-	grpc.ClientConnInterface
-	io.Closer
-	GetState() connectivity.State
-}
-
-func New(serverURL string, tlsCfg *TLSConfig) (Conn, error) {
-	url, err := url.Parse(serverURL)
-	if err != nil {
-		return nil, err
-	}
-	var dialOpts []grpc.DialOption
-	switch url.Scheme {
-	case "grpc": // not used in production
-		dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
-	case "grpcs":
-		tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12}
-		if tlsCfg != nil {
-			cp := x509.NewCertPool()
-			cp.AddCert(&tlsCfg.CaCert)
-			tlsConfig.RootCAs = cp
-			tlsConfig.Certificates = []tls.Certificate{tlsCfg.ClientCert}
-		} else {
-			tlsConfig.InsecureSkipVerify = true
-		}
-		dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
-	default:
-		return nil, errors.Errorf("unsupported scheme %q. Use one of %s", url.Scheme, []string{"grpc", "grpcs"})
-	}
-	return grpc.Dial(url.Host, dialOpts...)
-}
diff --git a/pkg/intercp/client/pool.go b/pkg/intercp/client/pool.go
deleted file mode 100644
index ed4486f..0000000
--- a/pkg/intercp/client/pool.go
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package client
-
-import (
-	"context"
-	"sync"
-	"time"
-)
-
-import (
-	"github.com/pkg/errors"
-
-	"google.golang.org/grpc/connectivity"
-)
-
-import (
-	"github.com/apache/dubbo-kubernetes/pkg/core"
-)
-
-var poolLog = core.Log.WithName("intercp").WithName("client").WithName("pool")
-
-type accessedConn struct {
-	conn           Conn
-	url            string
-	lastAccessTime time.Time
-}
-
-// Pool keeps the list of clients to inter-cp servers.
-// Because the list of inter-cp servers changes in runtime, we need to properly manage the connections to them (initialize, share, close etc.)
-// Pool helps us to not reimplement this for every inter-cp service (catalog, envoyadmin, etc.)
-type Pool struct {
-	newConn      func(string, *TLSConfig) (Conn, error)
-	idleDeadline time.Duration // the time after which we close the connection if it was not fetched from the pool
-	now          func() time.Time
-	connections  map[string]*accessedConn
-	mut          sync.Mutex
-
-	tlsCfg *TLSConfig
-}
-
-var TLSNotConfigured = errors.New("tls config is not yet set")
-
-func NewPool(
-	newConn func(string, *TLSConfig) (Conn, error),
-	idleDeadline time.Duration,
-	now func() time.Time,
-) *Pool {
-	return &Pool{
-		newConn:      newConn,
-		idleDeadline: idleDeadline,
-		now:          now,
-		connections:  map[string]*accessedConn{},
-		mut:          sync.Mutex{},
-	}
-}
-
-func (c *Pool) Client(serverURL string) (Conn, error) {
-	c.mut.Lock()
-	defer c.mut.Unlock()
-	if c.tlsCfg == nil {
-		return nil, TLSNotConfigured
-	}
-	ac, ok := c.connections[serverURL]
-	createNewConnection := !ok
-	if ok && ac.conn.GetState() == connectivity.TransientFailure {
-		createNewConnection = true
-		poolLog.Info("closing broken connection", "url", serverURL)
-		if err := ac.conn.Close(); err != nil {
-			poolLog.Error(err, "cannot close the connection", "url", serverURL)
-		}
-	}
-	if createNewConnection {
-		poolLog.Info("creating new connection", "url", serverURL)
-		conn, err := c.newConn(serverURL, c.tlsCfg)
-		if err != nil {
-			return nil, err
-		}
-		ac = &accessedConn{
-			conn: conn,
-			url:  serverURL,
-		}
-	}
-	ac.lastAccessTime = c.now()
-	c.connections[serverURL] = ac
-	return ac.conn, nil
-}
-
-// SetTLSConfig can configure TLS in runtime.
-// Because CA of the inter-cp server is managed by the CP in the runtime we cannot configure it when we create the pool.
-func (c *Pool) SetTLSConfig(tlsCfg *TLSConfig) {
-	c.mut.Lock()
-	c.tlsCfg = tlsCfg
-	c.mut.Unlock()
-}
-
-func (c *Pool) StartCleanup(ctx context.Context, ticker *time.Ticker) {
-	for {
-		select {
-		case now := <-ticker.C:
-			c.cleanup(now)
-		case <-ctx.Done():
-			return
-		}
-	}
-}
-
-func (c *Pool) cleanup(now time.Time) {
-	c.mut.Lock()
-	defer c.mut.Unlock()
-	for url, accessedConn := range c.connections {
-		if now.Sub(accessedConn.lastAccessTime) > c.idleDeadline {
-			poolLog.Info("closing connection due to lack of activity", "url", accessedConn.url)
-			if err := accessedConn.conn.Close(); err != nil {
-				poolLog.Error(err, "cannot close the connection", "url", accessedConn.url)
-			}
-			delete(c.connections, url)
-		}
-	}
-}
diff --git a/pkg/intercp/components.go b/pkg/intercp/components.go
deleted file mode 100644
index 8833b2a..0000000
--- a/pkg/intercp/components.go
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package intercp
-
-import (
-	"time"
-)
-
-import (
-	mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
-	"github.com/apache/dubbo-kubernetes/pkg/core"
-	"github.com/apache/dubbo-kubernetes/pkg/core/runtime"
-	"github.com/apache/dubbo-kubernetes/pkg/intercp/client"
-	"github.com/apache/dubbo-kubernetes/pkg/intercp/envoyadmin"
-)
-
-var log = core.Log.WithName("inter-cp")
-
-func Setup(rt runtime.Runtime) error {
-	return nil
-}
-
-func DefaultClientPool() *client.Pool {
-
-	return client.NewPool(client.New, 5*time.Minute, core.Now)
-}
-
-func PooledEnvoyAdminClientFn(pool *client.Pool) envoyadmin.NewClientFn {
-	return func(url string) (mesh_proto.InterCPEnvoyAdminForwardServiceClient, error) {
-		conn, err := pool.Client(url)
-		if err != nil {
-			return nil, err
-		}
-		return mesh_proto.NewInterCPEnvoyAdminForwardServiceClient(conn), nil
-	}
-}
diff --git a/pkg/intercp/envoyadmin/forwarding_dds_client.go b/pkg/intercp/envoyadmin/forwarding_dds_client.go
deleted file mode 100644
index 604cddb..0000000
--- a/pkg/intercp/envoyadmin/forwarding_dds_client.go
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package envoyadmin
-
-import (
-	"context"
-	"fmt"
-	"reflect"
-)
-
-import (
-	"github.com/pkg/errors"
-)
-
-import (
-	mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
-	"github.com/apache/dubbo-kubernetes/pkg/core"
-	"github.com/apache/dubbo-kubernetes/pkg/core/admin"
-	core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
-	core_system "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/system"
-	"github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
-	core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
-	core_store "github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
-	"github.com/apache/dubbo-kubernetes/pkg/dds/service"
-	"github.com/apache/dubbo-kubernetes/pkg/intercp/catalog"
-)
-
-var clientLog = core.Log.WithName("intercp").WithName("envoyadmin").WithName("client")
-
-type NewClientFn = func(url string) (mesh_proto.InterCPEnvoyAdminForwardServiceClient, error)
-
-type forwardingKdsEnvoyAdminClient struct {
-	resManager     manager.ReadOnlyResourceManager
-	cat            catalog.Catalog
-	instanceID     string
-	newClientFn    NewClientFn
-	fallbackClient admin.EnvoyAdminClient
-}
-
-// NewForwardingEnvoyAdminClient returns EnvoyAdminClient which is only used on Global CP in multizone environment.
-// It forwards the request to an instance of the Global CP to which Zone CP of given DPP is connected.
-//
-// For example:
-// We have 2 instances of Global CP (ins-1, ins-2). Dataplane "backend" is in zone "east".
-// The leader CP of zone "east" is connected to ins-1.
-// If we execute config dump for "backend" on ins-1, we follow the regular flow of pkg/envoy/admin/kds_client.go
-// If we execute config dump for "backend" on ins-2, we forward the request to ins-1 and then execute the regular flow.
-func NewForwardingEnvoyAdminClient(
-	resManager manager.ReadOnlyResourceManager,
-	cat catalog.Catalog,
-	instanceID string,
-	newClientFn NewClientFn,
-	fallbackClient admin.EnvoyAdminClient,
-) admin.EnvoyAdminClient {
-	return &forwardingKdsEnvoyAdminClient{
-		resManager:     resManager,
-		cat:            cat,
-		instanceID:     instanceID,
-		newClientFn:    newClientFn,
-		fallbackClient: fallbackClient,
-	}
-}
-
-var _ admin.EnvoyAdminClient = &forwardingKdsEnvoyAdminClient{}
-
-func (f *forwardingKdsEnvoyAdminClient) PostQuit(context.Context, *core_mesh.DataplaneResource) error {
-	panic("not implemented")
-}
-
-func (f *forwardingKdsEnvoyAdminClient) ConfigDump(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
-	instanceID, err := f.globalInstanceID(ctx, core_model.ZoneOfResource(proxy), service.ConfigDumpRPC)
-	if err != nil {
-		return nil, err
-	}
-	f.logIntendedAction(proxy, instanceID)
-	if instanceID == f.instanceID {
-		return f.fallbackClient.ConfigDump(ctx, proxy)
-	}
-	client, err := f.clientForInstanceID(ctx, instanceID)
-	if err != nil {
-		return nil, err
-	}
-	req := &mesh_proto.XDSConfigRequest{
-		ResourceType: string(proxy.Descriptor().Name),
-		ResourceName: proxy.GetMeta().GetName(),
-		ResourceMesh: proxy.GetMeta().GetMesh(),
-	}
-	resp, err := client.XDSConfig(ctx, req)
-	if err != nil {
-		return nil, err
-	}
-	return resp.GetConfig(), nil
-}
-
-func (f *forwardingKdsEnvoyAdminClient) Stats(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
-	instanceID, err := f.globalInstanceID(ctx, core_model.ZoneOfResource(proxy), service.StatsRPC)
-	if err != nil {
-		return nil, err
-	}
-	f.logIntendedAction(proxy, instanceID)
-	if instanceID == f.instanceID {
-		return f.fallbackClient.Stats(ctx, proxy)
-	}
-	client, err := f.clientForInstanceID(ctx, instanceID)
-	if err != nil {
-		return nil, err
-	}
-	req := &mesh_proto.StatsRequest{
-		ResourceType: string(proxy.Descriptor().Name),
-		ResourceName: proxy.GetMeta().GetName(),
-		ResourceMesh: proxy.GetMeta().GetMesh(),
-	}
-	resp, err := client.Stats(ctx, req)
-	if err != nil {
-		return nil, err
-	}
-	return resp.GetStats(), nil
-}
-
-func (f *forwardingKdsEnvoyAdminClient) Clusters(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
-	instanceID, err := f.globalInstanceID(ctx, core_model.ZoneOfResource(proxy), service.ClustersRPC)
-	if err != nil {
-		return nil, err
-	}
-	f.logIntendedAction(proxy, instanceID)
-	if instanceID == f.instanceID {
-		return f.fallbackClient.Clusters(ctx, proxy)
-	}
-	client, err := f.clientForInstanceID(ctx, instanceID)
-	if err != nil {
-		return nil, err
-	}
-	req := &mesh_proto.ClustersRequest{
-		ResourceType: string(proxy.Descriptor().Name),
-		ResourceName: proxy.GetMeta().GetName(),
-		ResourceMesh: proxy.GetMeta().GetMesh(),
-	}
-	resp, err := client.Clusters(ctx, req)
-	if err != nil {
-		return nil, err
-	}
-	return resp.GetClusters(), nil
-}
-
-func (f *forwardingKdsEnvoyAdminClient) logIntendedAction(proxy core_model.ResourceWithAddress, instanceID string) {
-	log := clientLog.WithValues(
-		"name", proxy.GetMeta().GetName(),
-		"mesh", proxy.GetMeta().GetMesh(),
-		"type", proxy.Descriptor().Name,
-		"instanceID", instanceID,
-	)
-	if instanceID == f.instanceID {
-		log.V(1).Info("zone CP of the resource is connected to this Global CP instance. Executing operation")
-	} else {
-		log.V(1).Info("zone CP of the resource is connected to other Global CP instance. Forwarding the request")
-	}
-}
-
-func (f *forwardingKdsEnvoyAdminClient) globalInstanceID(ctx context.Context, zone string, rpcName string) (string, error) {
-	zoneInsightRes := core_system.NewZoneInsightResource()
-	if err := f.resManager.Get(ctx, zoneInsightRes, core_store.GetByKey(zone, core_model.NoMesh)); err != nil {
-		return "", err
-	}
-	streams := zoneInsightRes.Spec.GetEnvoyAdminStreams()
-	var globalInstanceID string
-	switch rpcName {
-	case service.ConfigDumpRPC:
-		globalInstanceID = streams.GetConfigDumpGlobalInstanceId()
-	case service.StatsRPC:
-		globalInstanceID = streams.GetStatsGlobalInstanceId()
-	case service.ClustersRPC:
-		globalInstanceID = streams.GetClustersGlobalInstanceId()
-	default:
-		return "", errors.Errorf("invalid operation %s", rpcName)
-	}
-	if globalInstanceID == "" {
-		return "", &StreamNotConnectedError{rpcName: rpcName}
-	}
-	return globalInstanceID, nil
-}
-
-func (f *forwardingKdsEnvoyAdminClient) clientForInstanceID(ctx context.Context, instanceID string) (mesh_proto.InterCPEnvoyAdminForwardServiceClient, error) {
-	instance, err := catalog.InstanceOfID(ctx, f.cat, instanceID)
-	if err != nil {
-		return nil, err
-	}
-	return f.newClientFn(instance.InterCpURL())
-}
-
-type StreamNotConnectedError struct {
-	rpcName string
-}
-
-func (e *StreamNotConnectedError) Error() string {
-	return fmt.Sprintf("stream to execute %s operations is not yet connected", e.rpcName)
-}
-
-func (e *StreamNotConnectedError) Is(err error) bool {
-	return reflect.TypeOf(e) == reflect.TypeOf(err)
-}
diff --git a/pkg/intercp/envoyadmin/server.go b/pkg/intercp/envoyadmin/server.go
deleted file mode 100644
index b1096e6..0000000
--- a/pkg/intercp/envoyadmin/server.go
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package envoyadmin
-
-import (
-	"context"
-	"errors"
-)
-
-import (
-	mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
-	"github.com/apache/dubbo-kubernetes/pkg/core"
-	"github.com/apache/dubbo-kubernetes/pkg/core/admin"
-	"github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
-	"github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
-	"github.com/apache/dubbo-kubernetes/pkg/core/resources/registry"
-	core_store "github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
-)
-
-var serverLog = core.Log.WithName("intercp").WithName("catalog").WithName("server")
-
-type server struct {
-	adminClient admin.EnvoyAdminClient
-	resManager  manager.ReadOnlyResourceManager
-	mesh_proto.UnimplementedInterCPEnvoyAdminForwardServiceServer
-}
-
-var _ mesh_proto.InterCPEnvoyAdminForwardServiceServer = &server{}
-
-func NewServer(adminClient admin.EnvoyAdminClient, resManager manager.ReadOnlyResourceManager) mesh_proto.InterCPEnvoyAdminForwardServiceServer {
-	return &server{
-		adminClient: adminClient,
-		resManager:  resManager,
-	}
-}
-
-func (s *server) XDSConfig(ctx context.Context, req *mesh_proto.XDSConfigRequest) (*mesh_proto.XDSConfigResponse, error) {
-	serverLog.V(1).Info("received forwarded request", "operation", "XDSConfig", "request", req)
-	resWithAddr, err := s.resWithAddress(ctx, req.ResourceType, req.ResourceName, req.ResourceMesh)
-	if err != nil {
-		return nil, err
-	}
-	configDump, err := s.adminClient.ConfigDump(ctx, resWithAddr)
-	if err != nil {
-		return nil, err
-	}
-	return &mesh_proto.XDSConfigResponse{
-		Result: &mesh_proto.XDSConfigResponse_Config{
-			Config: configDump,
-		},
-	}, nil
-}
-
-func (s *server) Stats(ctx context.Context, req *mesh_proto.StatsRequest) (*mesh_proto.StatsResponse, error) {
-	serverLog.V(1).Info("received forwarded request", "operation", "Stats", "request", req)
-	resWithAddr, err := s.resWithAddress(ctx, req.ResourceType, req.ResourceName, req.ResourceMesh)
-	if err != nil {
-		return nil, err
-	}
-	stats, err := s.adminClient.Stats(ctx, resWithAddr)
-	if err != nil {
-		return nil, err
-	}
-	return &mesh_proto.StatsResponse{
-		Result: &mesh_proto.StatsResponse_Stats{
-			Stats: stats,
-		},
-	}, nil
-}
-
-func (s *server) Clusters(ctx context.Context, req *mesh_proto.ClustersRequest) (*mesh_proto.ClustersResponse, error) {
-	serverLog.V(1).Info("received forwarded request", "operation", "Clusters", "request", req)
-	resWithAddr, err := s.resWithAddress(ctx, req.ResourceType, req.ResourceName, req.ResourceMesh)
-	if err != nil {
-		return nil, err
-	}
-	clusters, err := s.adminClient.Clusters(ctx, resWithAddr)
-	if err != nil {
-		return nil, err
-	}
-	return &mesh_proto.ClustersResponse{
-		Result: &mesh_proto.ClustersResponse_Clusters{
-			Clusters: clusters,
-		},
-	}, nil
-}
-
-func (s *server) resWithAddress(ctx context.Context, typ, name, mesh string) (model.ResourceWithAddress, error) {
-	obj, err := registry.Global().NewObject(model.ResourceType(typ))
-	if err != nil {
-		return nil, err
-	}
-	if err := s.resManager.Get(ctx, obj, core_store.GetByKey(name, mesh)); err != nil {
-		return nil, err
-	}
-	resourceWithAddr, ok := obj.(model.ResourceWithAddress)
-	if !ok {
-		return nil, errors.New("invalid resource type")
-	}
-	return resourceWithAddr, nil
-}
diff --git a/pkg/intercp/server/server.go b/pkg/intercp/server/server.go
deleted file mode 100644
index 59f7356..0000000
--- a/pkg/intercp/server/server.go
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package server
-
-import (
-	"fmt"
-	"net"
-	"net/http"
-	"time"
-)
-
-import (
-	"google.golang.org/grpc"
-	"google.golang.org/grpc/keepalive"
-)
-
-import (
-	"github.com/apache/dubbo-kubernetes/pkg/config/intercp"
-	"github.com/apache/dubbo-kubernetes/pkg/core"
-	"github.com/apache/dubbo-kubernetes/pkg/core/runtime/component"
-)
-
-var log = core.Log.WithName("intercp-server")
-
-const (
-	grpcMaxConcurrentStreams = 1000000
-	grpcKeepAliveTime        = 15 * time.Second
-)
-
-type InterCpServer struct {
-	config     intercp.InterCpServerConfig
-	grpcServer *grpc.Server
-	instanceId string
-}
-
-var _ component.Component = &InterCpServer{}
-
-func New(
-	config intercp.InterCpServerConfig,
-	instanceId string,
-) (*InterCpServer, error) {
-	grpcOptions := []grpc.ServerOption{
-		grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams),
-		grpc.KeepaliveParams(keepalive.ServerParameters{
-			Time:    grpcKeepAliveTime,
-			Timeout: grpcKeepAliveTime,
-		}),
-		grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
-			MinTime:             grpcKeepAliveTime,
-			PermitWithoutStream: true,
-		}),
-	}
-
-	grpcOptions = append(grpcOptions)
-	grpcServer := grpc.NewServer(grpcOptions...)
-
-	return &InterCpServer{
-		config:     config,
-		grpcServer: grpcServer,
-		instanceId: instanceId,
-	}, nil
-}
-
-func (d *InterCpServer) Start(stop <-chan struct{}) error {
-	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", d.config.Port))
-	if err != nil {
-		return err
-	}
-	log := log.WithValues(
-		"instanceId",
-		d.instanceId,
-	)
-
-	errChan := make(chan error)
-	go func() {
-		defer close(errChan)
-		if err := d.grpcServer.Serve(lis); err != nil {
-			if err != http.ErrServerClosed {
-				log.Error(err, "terminated with an error")
-				errChan <- err
-				return
-			}
-		}
-		log.Info("terminated normally")
-	}()
-	log.Info("starting", "interface", "0.0.0.0", "port", d.config.Port, "tls", true)
-
-	select {
-	case <-stop:
-		log.Info("stopping gracefully")
-		d.grpcServer.GracefulStop()
-		log.Info("stopped")
-		return nil
-	case err := <-errChan:
-		return err
-	}
-}
-
-func (d *InterCpServer) NeedLeaderElection() bool {
-	return false
-}
-
-func (d *InterCpServer) GrpcServer() *grpc.Server {
-	return d.grpcServer
-}