delete inter cp
diff --git a/app/dubbo-cp/cmd/run.go b/app/dubbo-cp/cmd/run.go
index 9b5c275..71b8389 100644
--- a/app/dubbo-cp/cmd/run.go
+++ b/app/dubbo-cp/cmd/run.go
@@ -41,7 +41,6 @@
dp_server "github.com/apache/dubbo-kubernetes/pkg/dp-server"
"github.com/apache/dubbo-kubernetes/pkg/dubbo"
"github.com/apache/dubbo-kubernetes/pkg/hds"
- "github.com/apache/dubbo-kubernetes/pkg/intercp"
"github.com/apache/dubbo-kubernetes/pkg/test"
"github.com/apache/dubbo-kubernetes/pkg/util/os"
dubbo_version "github.com/apache/dubbo-kubernetes/pkg/version"
@@ -139,10 +138,6 @@
runLog.Error(err, "unable to set up Diagnostics server")
return err
}
- if err := intercp.Setup(rt); err != nil {
- runLog.Error(err, "unable to set up Control Plane Intercommunication")
- return err
- }
if rt.GetMode() == core.Test {
if err := test.Setup(rt); err != nil {
diff --git a/pkg/intercp/catalog/catalog.go b/pkg/intercp/catalog/catalog.go
deleted file mode 100644
index 6d91fc7..0000000
--- a/pkg/intercp/catalog/catalog.go
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package catalog
-
-import (
- "context"
- "fmt"
- "net"
- "strconv"
-)
-
-import (
- "github.com/pkg/errors"
-)
-
-type Instance struct {
- Id string `json:"id"`
- Address string `json:"address"`
- InterCpPort uint16 `json:"interCpPort"`
- Leader bool `json:"leader"`
-}
-
-func (i Instance) InterCpURL() string {
- return fmt.Sprintf("grpcs://%s", net.JoinHostPort(i.Address, strconv.Itoa(int(i.InterCpPort))))
-}
-
-type Reader interface {
- Instances(context.Context) ([]Instance, error)
-}
-
-type Catalog interface {
- Reader
- Replace(context.Context, []Instance) (bool, error)
- ReplaceLeader(context.Context, Instance) error
-}
-
-var (
- ErrNoLeader = errors.New("leader not found")
- ErrInstanceNotFound = errors.New("instance not found")
-)
-
-func Leader(ctx context.Context, catalog Catalog) (Instance, error) {
- instances, err := catalog.Instances(ctx)
- if err != nil {
- return Instance{}, err
- }
- for _, instance := range instances {
- if instance.Leader {
- return instance, nil
- }
- }
- return Instance{}, ErrNoLeader
-}
-
-func InstanceOfID(ctx context.Context, catalog Catalog, id string) (Instance, error) {
- instances, err := catalog.Instances(ctx)
- if err != nil {
- return Instance{}, err
- }
- for _, instance := range instances {
- if instance.Id == id {
- return instance, nil
- }
- }
- return Instance{}, ErrInstanceNotFound
-}
-
-type InstancesByID []Instance
-
-func (a InstancesByID) Len() int { return len(a) }
-func (a InstancesByID) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
-func (a InstancesByID) Less(i, j int) bool {
- return a[i].Id < a[j].Id
-}
diff --git a/pkg/intercp/catalog/config_catalog.go b/pkg/intercp/catalog/config_catalog.go
deleted file mode 100644
index dd5f218..0000000
--- a/pkg/intercp/catalog/config_catalog.go
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package catalog
-
-import (
- "context"
- "encoding/json"
- "sort"
-)
-
-import (
- system_proto "github.com/apache/dubbo-kubernetes/api/system/v1alpha1"
- "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/system"
- "github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
- "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
- "github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
-)
-
-type ConfigInstances struct {
- Instances []Instance `json:"instances"`
-}
-
-var CatalogKey = model.ResourceKey{
- Name: "cp-catalog",
-}
-
-type ConfigCatalog struct {
- resManager manager.ResourceManager
- ConfigCatalogReader
-}
-
-var _ Catalog = &ConfigCatalog{}
-
-func NewConfigCatalog(resManager manager.ResourceManager) Catalog {
- return &ConfigCatalog{
- resManager: resManager,
- ConfigCatalogReader: ConfigCatalogReader{
- resManager: resManager,
- },
- }
-}
-
-func (c *ConfigCatalog) Replace(ctx context.Context, instances []Instance) (bool, error) {
- sort.Stable(InstancesByID(instances))
- bytes, err := json.Marshal(ConfigInstances{
- Instances: instances,
- })
- if err != nil {
- return false, nil
- }
- newConfig := string(bytes)
- var updated bool
- err = manager.Upsert(ctx, c.resManager, CatalogKey, system.NewConfigResource(), func(resource model.Resource) error {
- if resource.(*system.ConfigResource).Spec.GetConfig() != newConfig {
- resource.(*system.ConfigResource).Spec = &system_proto.Config{
- Config: newConfig,
- }
- updated = true
- }
- return nil
- })
- return updated, err
-}
-
-func (c *ConfigCatalog) ReplaceLeader(ctx context.Context, leader Instance) error {
- return manager.Upsert(ctx, c.resManager, CatalogKey, system.NewConfigResource(), func(resource model.Resource) error {
- instances := &ConfigInstances{}
- if cfg := resource.(*system.ConfigResource).Spec.GetConfig(); cfg != "" {
- if err := json.Unmarshal([]byte(cfg), instances); err != nil {
- return err
- }
- }
- leaderFound := false
- for i, instance := range instances.Instances {
- instance.Leader = false
- if instance.Id == leader.Id {
- instance.Leader = true
- leaderFound = true
- }
- instances.Instances[i] = instance
- }
- if !leaderFound {
- instances.Instances = append(instances.Instances, leader)
- sort.Stable(InstancesByID(instances.Instances))
- }
- bytes, err := json.Marshal(instances)
- if err != nil {
- return err
- }
- resource.(*system.ConfigResource).Spec = &system_proto.Config{
- Config: string(bytes),
- }
- return nil
- })
-}
-
-type ConfigCatalogReader struct {
- resManager manager.ReadOnlyResourceManager
-}
-
-var _ Reader = &ConfigCatalogReader{}
-
-func NewConfigCatalogReader(resManager manager.ReadOnlyResourceManager) Reader {
- return &ConfigCatalogReader{
- resManager: resManager,
- }
-}
-
-func (c *ConfigCatalogReader) Instances(ctx context.Context) ([]Instance, error) {
- cfg := system.NewConfigResource()
- if err := c.resManager.Get(ctx, cfg, store.GetBy(CatalogKey)); err != nil {
- if store.IsResourceNotFound(err) {
- return []Instance{}, nil
- }
- return nil, err
- }
- var instances ConfigInstances
- if err := json.Unmarshal([]byte(cfg.Spec.Config), &instances); err != nil {
- return nil, err
- }
- return instances.Instances, nil
-}
diff --git a/pkg/intercp/catalog/heartbeat_component.go b/pkg/intercp/catalog/heartbeat_component.go
deleted file mode 100644
index b36c2f4..0000000
--- a/pkg/intercp/catalog/heartbeat_component.go
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package catalog
-
-import (
- "context"
- "time"
-)
-
-import (
- "github.com/pkg/errors"
-)
-
-import (
- system_proto "github.com/apache/dubbo-kubernetes/api/system/v1alpha1"
- "github.com/apache/dubbo-kubernetes/pkg/core"
- "github.com/apache/dubbo-kubernetes/pkg/core/runtime/component"
-)
-
-var heartbeatLog = core.Log.WithName("intercp").WithName("catalog").WithName("heartbeat")
-
-type heartbeatComponent struct {
- catalog Catalog
- getClientFn GetClientFn
- request *system_proto.PingRequest
- interval time.Duration
-
- leader *Instance
-}
-
-var _ component.Component = &heartbeatComponent{}
-
-type GetClientFn = func(url string) (system_proto.InterCpPingServiceClient, error)
-
-func NewHeartbeatComponent(
- catalog Catalog,
- instance Instance,
- interval time.Duration,
- newClientFn GetClientFn,
-) (component.Component, error) {
- return &heartbeatComponent{
- catalog: catalog,
- request: &system_proto.PingRequest{
- InstanceId: instance.Id,
- Address: instance.Address,
- InterCpPort: uint32(instance.InterCpPort),
- },
- getClientFn: newClientFn,
- interval: interval,
- }, nil
-}
-
-func (h *heartbeatComponent) Start(stop <-chan struct{}) error {
- heartbeatLog.Info("starting heartbeats to a leader")
- ticker := time.NewTicker(h.interval)
- ctx := context.Background()
-
- for {
- select {
- case <-ticker.C:
- if !h.heartbeat(ctx, true) {
- continue
- }
- case <-stop:
- // send final heartbeat to gracefully signal that the instance is going down
- _ = h.heartbeat(ctx, false)
- return nil
- }
- }
-}
-
-func (h *heartbeatComponent) heartbeat(ctx context.Context, ready bool) bool {
- heartbeatLog := heartbeatLog.WithValues(
- "instanceId", h.request.InstanceId,
- "ready", ready,
- )
- if h.leader == nil {
- if err := h.connectToLeader(ctx); err != nil {
- heartbeatLog.Error(err, "could not connect to leader")
- return false
- }
- }
- if h.leader.Id == h.request.InstanceId {
- heartbeatLog.V(1).Info("this instance is a leader. No need to send a heartbeat.")
- return true
- }
- heartbeatLog = heartbeatLog.WithValues(
- "leaderAddress", h.leader.Address,
- )
- heartbeatLog.V(1).Info("sending a heartbeat to a leader")
- h.request.Ready = ready
- client, err := h.getClientFn(h.leader.InterCpURL())
- if err != nil {
- heartbeatLog.Error(err, "could not get or create a client to a leader")
- h.leader = nil
- return false
- }
- resp, err := client.Ping(ctx, h.request)
- if err != nil {
- heartbeatLog.Error(err, "could not send a heartbeat to a leader")
- h.leader = nil
- return false
- }
- if !resp.Leader {
- heartbeatLog.V(1).Info("instance responded that it is no longer a leader")
- h.leader = nil
- }
- return true
-}
-
-func (h *heartbeatComponent) connectToLeader(ctx context.Context) error {
- newLeader, err := Leader(ctx, h.catalog)
- if err != nil {
- return err
- }
- h.leader = &newLeader
- if h.leader.Id == h.request.InstanceId {
- return nil
- }
- heartbeatLog.Info("leader has changed. Creating connection to the new leader.",
- "previousLeaderAddress", h.leader.Address,
- "newLeaderAddress", newLeader.Leader,
- )
- _, err = h.getClientFn(h.leader.InterCpURL())
- if err != nil {
- return errors.Wrap(err, "could not create a client to a leader")
- }
- return nil
-}
-
-func (h *heartbeatComponent) NeedLeaderElection() bool {
- return false
-}
diff --git a/pkg/intercp/catalog/heartbeats.go b/pkg/intercp/catalog/heartbeats.go
deleted file mode 100644
index df65376..0000000
--- a/pkg/intercp/catalog/heartbeats.go
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package catalog
-
-import (
- "sync"
-)
-
-type Heartbeats struct {
- instances map[Instance]struct{}
- sync.Mutex
-}
-
-func NewHeartbeats() *Heartbeats {
- return &Heartbeats{
- instances: map[Instance]struct{}{},
- }
-}
-
-func (h *Heartbeats) ResetAndCollect() []Instance {
- h.Lock()
- currentInstances := h.instances
- h.instances = map[Instance]struct{}{}
- h.Unlock()
- var instances []Instance
- for k := range currentInstances {
- instances = append(instances, k)
- }
- return instances
-}
-
-func (h *Heartbeats) Add(instance Instance) {
- h.Lock()
- h.instances[instance] = struct{}{}
- h.Unlock()
-}
-
-func (h *Heartbeats) Remove(instance Instance) {
- h.Lock()
- delete(h.instances, instance)
- h.Unlock()
-}
diff --git a/pkg/intercp/catalog/server.go b/pkg/intercp/catalog/server.go
deleted file mode 100644
index db76ddc..0000000
--- a/pkg/intercp/catalog/server.go
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package catalog
-
-import (
- "context"
-)
-
-import (
- system_proto "github.com/apache/dubbo-kubernetes/api/system/v1alpha1"
- "github.com/apache/dubbo-kubernetes/pkg/core"
- "github.com/apache/dubbo-kubernetes/pkg/core/runtime/component"
-)
-
-var serverLog = core.Log.WithName("intercp").WithName("catalog").WithName("server")
-
-type server struct {
- heartbeats *Heartbeats
- leaderInfo component.LeaderInfo
-
- system_proto.UnimplementedInterCpPingServiceServer
-}
-
-var _ system_proto.InterCpPingServiceServer = &server{}
-
-func NewServer(heartbeats *Heartbeats, leaderInfo component.LeaderInfo) system_proto.InterCpPingServiceServer {
- return &server{
- heartbeats: heartbeats,
- leaderInfo: leaderInfo,
- }
-}
-
-func (s *server) Ping(_ context.Context, request *system_proto.PingRequest) (*system_proto.PingResponse, error) {
- serverLog.V(1).Info("received ping", "instanceID", request.InstanceId, "address", request.Address, "ready", request.Ready)
- instance := Instance{
- Id: request.InstanceId,
- Address: request.Address,
- InterCpPort: uint16(request.InterCpPort),
- Leader: false,
- }
- if request.Ready {
- s.heartbeats.Add(instance)
- } else {
- s.heartbeats.Remove(instance)
- }
- return &system_proto.PingResponse{
- Leader: s.leaderInfo.IsLeader(),
- }, nil
-}
diff --git a/pkg/intercp/catalog/writer.go b/pkg/intercp/catalog/writer.go
deleted file mode 100644
index 4861b83..0000000
--- a/pkg/intercp/catalog/writer.go
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package catalog
-
-import (
- "context"
- "time"
-)
-
-import (
- "github.com/apache/dubbo-kubernetes/pkg/core"
- "github.com/apache/dubbo-kubernetes/pkg/core/runtime/component"
-)
-
-var writerLog = core.Log.WithName("intercp").WithName("catalog").WithName("writer")
-
-type catalogWriter struct {
- catalog Catalog
- heartbeats *Heartbeats
- instance Instance
- interval time.Duration
-}
-
-var _ component.Component = &catalogWriter{}
-
-func NewWriter(
- catalog Catalog,
- heartbeats *Heartbeats,
- instance Instance,
- interval time.Duration,
-) (component.Component, error) {
- leaderInstance := instance
- leaderInstance.Leader = true
- return &catalogWriter{
- catalog: catalog,
- heartbeats: heartbeats,
- instance: leaderInstance,
- interval: interval,
- }, nil
-}
-
-func (r *catalogWriter) Start(stop <-chan struct{}) error {
- heartbeatLog.Info("starting catalog writer")
- ctx := context.Background()
- writerLog.Info("replacing a leader in the catalog")
- if err := r.catalog.ReplaceLeader(ctx, r.instance); err != nil {
- writerLog.Error(err, "could not replace leader") // continue, it will be replaced in ticker anyways
- }
- ticker := time.NewTicker(r.interval)
- for {
- select {
- case <-ticker.C:
- instances := r.heartbeats.ResetAndCollect()
- instances = append(instances, r.instance)
- updated, err := r.catalog.Replace(ctx, instances)
- if err != nil {
- writerLog.Error(err, "could not update catalog")
- continue
- }
- if updated {
- writerLog.Info("instances catalog updated", "instances", instances)
- } else {
- writerLog.V(1).Info("no need to update instances, because the catalog is the same", "instances", instances)
- }
- case <-stop:
- return nil
- }
- }
-}
-
-func (r *catalogWriter) NeedLeaderElection() bool {
- return true
-}
diff --git a/pkg/intercp/client/client.go b/pkg/intercp/client/client.go
deleted file mode 100644
index 7f3cbf5..0000000
--- a/pkg/intercp/client/client.go
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package client
-
-import (
- "crypto/tls"
- "crypto/x509"
- "io"
- "net/url"
-)
-
-import (
- "github.com/pkg/errors"
-
- "google.golang.org/grpc"
- "google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/credentials/insecure"
-)
-
-type TLSConfig struct {
- CaCert x509.Certificate
- ClientCert tls.Certificate
-}
-
-type Conn interface {
- grpc.ClientConnInterface
- io.Closer
- GetState() connectivity.State
-}
-
-func New(serverURL string, tlsCfg *TLSConfig) (Conn, error) {
- url, err := url.Parse(serverURL)
- if err != nil {
- return nil, err
- }
- var dialOpts []grpc.DialOption
- switch url.Scheme {
- case "grpc": // not used in production
- dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
- case "grpcs":
- tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12}
- if tlsCfg != nil {
- cp := x509.NewCertPool()
- cp.AddCert(&tlsCfg.CaCert)
- tlsConfig.RootCAs = cp
- tlsConfig.Certificates = []tls.Certificate{tlsCfg.ClientCert}
- } else {
- tlsConfig.InsecureSkipVerify = true
- }
- dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
- default:
- return nil, errors.Errorf("unsupported scheme %q. Use one of %s", url.Scheme, []string{"grpc", "grpcs"})
- }
- return grpc.Dial(url.Host, dialOpts...)
-}
diff --git a/pkg/intercp/client/pool.go b/pkg/intercp/client/pool.go
deleted file mode 100644
index ed4486f..0000000
--- a/pkg/intercp/client/pool.go
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package client
-
-import (
- "context"
- "sync"
- "time"
-)
-
-import (
- "github.com/pkg/errors"
-
- "google.golang.org/grpc/connectivity"
-)
-
-import (
- "github.com/apache/dubbo-kubernetes/pkg/core"
-)
-
-var poolLog = core.Log.WithName("intercp").WithName("client").WithName("pool")
-
-type accessedConn struct {
- conn Conn
- url string
- lastAccessTime time.Time
-}
-
-// Pool keeps the list of clients to inter-cp servers.
-// Because the list of inter-cp servers changes in runtime, we need to properly manage the connections to them (initialize, share, close etc.)
-// Pool helps us to not reimplement this for every inter-cp service (catalog, envoyadmin, etc.)
-type Pool struct {
- newConn func(string, *TLSConfig) (Conn, error)
- idleDeadline time.Duration // the time after which we close the connection if it was not fetched from the pool
- now func() time.Time
- connections map[string]*accessedConn
- mut sync.Mutex
-
- tlsCfg *TLSConfig
-}
-
-var TLSNotConfigured = errors.New("tls config is not yet set")
-
-func NewPool(
- newConn func(string, *TLSConfig) (Conn, error),
- idleDeadline time.Duration,
- now func() time.Time,
-) *Pool {
- return &Pool{
- newConn: newConn,
- idleDeadline: idleDeadline,
- now: now,
- connections: map[string]*accessedConn{},
- mut: sync.Mutex{},
- }
-}
-
-func (c *Pool) Client(serverURL string) (Conn, error) {
- c.mut.Lock()
- defer c.mut.Unlock()
- if c.tlsCfg == nil {
- return nil, TLSNotConfigured
- }
- ac, ok := c.connections[serverURL]
- createNewConnection := !ok
- if ok && ac.conn.GetState() == connectivity.TransientFailure {
- createNewConnection = true
- poolLog.Info("closing broken connection", "url", serverURL)
- if err := ac.conn.Close(); err != nil {
- poolLog.Error(err, "cannot close the connection", "url", serverURL)
- }
- }
- if createNewConnection {
- poolLog.Info("creating new connection", "url", serverURL)
- conn, err := c.newConn(serverURL, c.tlsCfg)
- if err != nil {
- return nil, err
- }
- ac = &accessedConn{
- conn: conn,
- url: serverURL,
- }
- }
- ac.lastAccessTime = c.now()
- c.connections[serverURL] = ac
- return ac.conn, nil
-}
-
-// SetTLSConfig can configure TLS in runtime.
-// Because CA of the inter-cp server is managed by the CP in the runtime we cannot configure it when we create the pool.
-func (c *Pool) SetTLSConfig(tlsCfg *TLSConfig) {
- c.mut.Lock()
- c.tlsCfg = tlsCfg
- c.mut.Unlock()
-}
-
-func (c *Pool) StartCleanup(ctx context.Context, ticker *time.Ticker) {
- for {
- select {
- case now := <-ticker.C:
- c.cleanup(now)
- case <-ctx.Done():
- return
- }
- }
-}
-
-func (c *Pool) cleanup(now time.Time) {
- c.mut.Lock()
- defer c.mut.Unlock()
- for url, accessedConn := range c.connections {
- if now.Sub(accessedConn.lastAccessTime) > c.idleDeadline {
- poolLog.Info("closing connection due to lack of activity", "url", accessedConn.url)
- if err := accessedConn.conn.Close(); err != nil {
- poolLog.Error(err, "cannot close the connection", "url", accessedConn.url)
- }
- delete(c.connections, url)
- }
- }
-}
diff --git a/pkg/intercp/components.go b/pkg/intercp/components.go
deleted file mode 100644
index 8833b2a..0000000
--- a/pkg/intercp/components.go
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package intercp
-
-import (
- "time"
-)
-
-import (
- mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
- "github.com/apache/dubbo-kubernetes/pkg/core"
- "github.com/apache/dubbo-kubernetes/pkg/core/runtime"
- "github.com/apache/dubbo-kubernetes/pkg/intercp/client"
- "github.com/apache/dubbo-kubernetes/pkg/intercp/envoyadmin"
-)
-
-var log = core.Log.WithName("inter-cp")
-
-func Setup(rt runtime.Runtime) error {
- return nil
-}
-
-func DefaultClientPool() *client.Pool {
-
- return client.NewPool(client.New, 5*time.Minute, core.Now)
-}
-
-func PooledEnvoyAdminClientFn(pool *client.Pool) envoyadmin.NewClientFn {
- return func(url string) (mesh_proto.InterCPEnvoyAdminForwardServiceClient, error) {
- conn, err := pool.Client(url)
- if err != nil {
- return nil, err
- }
- return mesh_proto.NewInterCPEnvoyAdminForwardServiceClient(conn), nil
- }
-}
diff --git a/pkg/intercp/envoyadmin/forwarding_dds_client.go b/pkg/intercp/envoyadmin/forwarding_dds_client.go
deleted file mode 100644
index 604cddb..0000000
--- a/pkg/intercp/envoyadmin/forwarding_dds_client.go
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package envoyadmin
-
-import (
- "context"
- "fmt"
- "reflect"
-)
-
-import (
- "github.com/pkg/errors"
-)
-
-import (
- mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
- "github.com/apache/dubbo-kubernetes/pkg/core"
- "github.com/apache/dubbo-kubernetes/pkg/core/admin"
- core_mesh "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
- core_system "github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/system"
- "github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
- core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
- core_store "github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
- "github.com/apache/dubbo-kubernetes/pkg/dds/service"
- "github.com/apache/dubbo-kubernetes/pkg/intercp/catalog"
-)
-
-var clientLog = core.Log.WithName("intercp").WithName("envoyadmin").WithName("client")
-
-type NewClientFn = func(url string) (mesh_proto.InterCPEnvoyAdminForwardServiceClient, error)
-
-type forwardingKdsEnvoyAdminClient struct {
- resManager manager.ReadOnlyResourceManager
- cat catalog.Catalog
- instanceID string
- newClientFn NewClientFn
- fallbackClient admin.EnvoyAdminClient
-}
-
-// NewForwardingEnvoyAdminClient returns EnvoyAdminClient which is only used on Global CP in multizone environment.
-// It forwards the request to an instance of the Global CP to which Zone CP of given DPP is connected.
-//
-// For example:
-// We have 2 instances of Global CP (ins-1, ins-2). Dataplane "backend" is in zone "east".
-// The leader CP of zone "east" is connected to ins-1.
-// If we execute config dump for "backend" on ins-1, we follow the regular flow of pkg/envoy/admin/kds_client.go
-// If we execute config dump for "backend" on ins-2, we forward the request to ins-1 and then execute the regular flow.
-func NewForwardingEnvoyAdminClient(
- resManager manager.ReadOnlyResourceManager,
- cat catalog.Catalog,
- instanceID string,
- newClientFn NewClientFn,
- fallbackClient admin.EnvoyAdminClient,
-) admin.EnvoyAdminClient {
- return &forwardingKdsEnvoyAdminClient{
- resManager: resManager,
- cat: cat,
- instanceID: instanceID,
- newClientFn: newClientFn,
- fallbackClient: fallbackClient,
- }
-}
-
-var _ admin.EnvoyAdminClient = &forwardingKdsEnvoyAdminClient{}
-
-func (f *forwardingKdsEnvoyAdminClient) PostQuit(context.Context, *core_mesh.DataplaneResource) error {
- panic("not implemented")
-}
-
-func (f *forwardingKdsEnvoyAdminClient) ConfigDump(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
- instanceID, err := f.globalInstanceID(ctx, core_model.ZoneOfResource(proxy), service.ConfigDumpRPC)
- if err != nil {
- return nil, err
- }
- f.logIntendedAction(proxy, instanceID)
- if instanceID == f.instanceID {
- return f.fallbackClient.ConfigDump(ctx, proxy)
- }
- client, err := f.clientForInstanceID(ctx, instanceID)
- if err != nil {
- return nil, err
- }
- req := &mesh_proto.XDSConfigRequest{
- ResourceType: string(proxy.Descriptor().Name),
- ResourceName: proxy.GetMeta().GetName(),
- ResourceMesh: proxy.GetMeta().GetMesh(),
- }
- resp, err := client.XDSConfig(ctx, req)
- if err != nil {
- return nil, err
- }
- return resp.GetConfig(), nil
-}
-
-func (f *forwardingKdsEnvoyAdminClient) Stats(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
- instanceID, err := f.globalInstanceID(ctx, core_model.ZoneOfResource(proxy), service.StatsRPC)
- if err != nil {
- return nil, err
- }
- f.logIntendedAction(proxy, instanceID)
- if instanceID == f.instanceID {
- return f.fallbackClient.Stats(ctx, proxy)
- }
- client, err := f.clientForInstanceID(ctx, instanceID)
- if err != nil {
- return nil, err
- }
- req := &mesh_proto.StatsRequest{
- ResourceType: string(proxy.Descriptor().Name),
- ResourceName: proxy.GetMeta().GetName(),
- ResourceMesh: proxy.GetMeta().GetMesh(),
- }
- resp, err := client.Stats(ctx, req)
- if err != nil {
- return nil, err
- }
- return resp.GetStats(), nil
-}
-
-func (f *forwardingKdsEnvoyAdminClient) Clusters(ctx context.Context, proxy core_model.ResourceWithAddress) ([]byte, error) {
- instanceID, err := f.globalInstanceID(ctx, core_model.ZoneOfResource(proxy), service.ClustersRPC)
- if err != nil {
- return nil, err
- }
- f.logIntendedAction(proxy, instanceID)
- if instanceID == f.instanceID {
- return f.fallbackClient.Clusters(ctx, proxy)
- }
- client, err := f.clientForInstanceID(ctx, instanceID)
- if err != nil {
- return nil, err
- }
- req := &mesh_proto.ClustersRequest{
- ResourceType: string(proxy.Descriptor().Name),
- ResourceName: proxy.GetMeta().GetName(),
- ResourceMesh: proxy.GetMeta().GetMesh(),
- }
- resp, err := client.Clusters(ctx, req)
- if err != nil {
- return nil, err
- }
- return resp.GetClusters(), nil
-}
-
-func (f *forwardingKdsEnvoyAdminClient) logIntendedAction(proxy core_model.ResourceWithAddress, instanceID string) {
- log := clientLog.WithValues(
- "name", proxy.GetMeta().GetName(),
- "mesh", proxy.GetMeta().GetMesh(),
- "type", proxy.Descriptor().Name,
- "instanceID", instanceID,
- )
- if instanceID == f.instanceID {
- log.V(1).Info("zone CP of the resource is connected to this Global CP instance. Executing operation")
- } else {
- log.V(1).Info("zone CP of the resource is connected to other Global CP instance. Forwarding the request")
- }
-}
-
-func (f *forwardingKdsEnvoyAdminClient) globalInstanceID(ctx context.Context, zone string, rpcName string) (string, error) {
- zoneInsightRes := core_system.NewZoneInsightResource()
- if err := f.resManager.Get(ctx, zoneInsightRes, core_store.GetByKey(zone, core_model.NoMesh)); err != nil {
- return "", err
- }
- streams := zoneInsightRes.Spec.GetEnvoyAdminStreams()
- var globalInstanceID string
- switch rpcName {
- case service.ConfigDumpRPC:
- globalInstanceID = streams.GetConfigDumpGlobalInstanceId()
- case service.StatsRPC:
- globalInstanceID = streams.GetStatsGlobalInstanceId()
- case service.ClustersRPC:
- globalInstanceID = streams.GetClustersGlobalInstanceId()
- default:
- return "", errors.Errorf("invalid operation %s", rpcName)
- }
- if globalInstanceID == "" {
- return "", &StreamNotConnectedError{rpcName: rpcName}
- }
- return globalInstanceID, nil
-}
-
-func (f *forwardingKdsEnvoyAdminClient) clientForInstanceID(ctx context.Context, instanceID string) (mesh_proto.InterCPEnvoyAdminForwardServiceClient, error) {
- instance, err := catalog.InstanceOfID(ctx, f.cat, instanceID)
- if err != nil {
- return nil, err
- }
- return f.newClientFn(instance.InterCpURL())
-}
-
-type StreamNotConnectedError struct {
- rpcName string
-}
-
-func (e *StreamNotConnectedError) Error() string {
- return fmt.Sprintf("stream to execute %s operations is not yet connected", e.rpcName)
-}
-
-func (e *StreamNotConnectedError) Is(err error) bool {
- return reflect.TypeOf(e) == reflect.TypeOf(err)
-}
diff --git a/pkg/intercp/envoyadmin/server.go b/pkg/intercp/envoyadmin/server.go
deleted file mode 100644
index b1096e6..0000000
--- a/pkg/intercp/envoyadmin/server.go
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package envoyadmin
-
-import (
- "context"
- "errors"
-)
-
-import (
- mesh_proto "github.com/apache/dubbo-kubernetes/api/mesh/v1alpha1"
- "github.com/apache/dubbo-kubernetes/pkg/core"
- "github.com/apache/dubbo-kubernetes/pkg/core/admin"
- "github.com/apache/dubbo-kubernetes/pkg/core/resources/manager"
- "github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
- "github.com/apache/dubbo-kubernetes/pkg/core/resources/registry"
- core_store "github.com/apache/dubbo-kubernetes/pkg/core/resources/store"
-)
-
-var serverLog = core.Log.WithName("intercp").WithName("catalog").WithName("server")
-
-type server struct {
- adminClient admin.EnvoyAdminClient
- resManager manager.ReadOnlyResourceManager
- mesh_proto.UnimplementedInterCPEnvoyAdminForwardServiceServer
-}
-
-var _ mesh_proto.InterCPEnvoyAdminForwardServiceServer = &server{}
-
-func NewServer(adminClient admin.EnvoyAdminClient, resManager manager.ReadOnlyResourceManager) mesh_proto.InterCPEnvoyAdminForwardServiceServer {
- return &server{
- adminClient: adminClient,
- resManager: resManager,
- }
-}
-
-func (s *server) XDSConfig(ctx context.Context, req *mesh_proto.XDSConfigRequest) (*mesh_proto.XDSConfigResponse, error) {
- serverLog.V(1).Info("received forwarded request", "operation", "XDSConfig", "request", req)
- resWithAddr, err := s.resWithAddress(ctx, req.ResourceType, req.ResourceName, req.ResourceMesh)
- if err != nil {
- return nil, err
- }
- configDump, err := s.adminClient.ConfigDump(ctx, resWithAddr)
- if err != nil {
- return nil, err
- }
- return &mesh_proto.XDSConfigResponse{
- Result: &mesh_proto.XDSConfigResponse_Config{
- Config: configDump,
- },
- }, nil
-}
-
-func (s *server) Stats(ctx context.Context, req *mesh_proto.StatsRequest) (*mesh_proto.StatsResponse, error) {
- serverLog.V(1).Info("received forwarded request", "operation", "Stats", "request", req)
- resWithAddr, err := s.resWithAddress(ctx, req.ResourceType, req.ResourceName, req.ResourceMesh)
- if err != nil {
- return nil, err
- }
- stats, err := s.adminClient.Stats(ctx, resWithAddr)
- if err != nil {
- return nil, err
- }
- return &mesh_proto.StatsResponse{
- Result: &mesh_proto.StatsResponse_Stats{
- Stats: stats,
- },
- }, nil
-}
-
-func (s *server) Clusters(ctx context.Context, req *mesh_proto.ClustersRequest) (*mesh_proto.ClustersResponse, error) {
- serverLog.V(1).Info("received forwarded request", "operation", "Clusters", "request", req)
- resWithAddr, err := s.resWithAddress(ctx, req.ResourceType, req.ResourceName, req.ResourceMesh)
- if err != nil {
- return nil, err
- }
- clusters, err := s.adminClient.Clusters(ctx, resWithAddr)
- if err != nil {
- return nil, err
- }
- return &mesh_proto.ClustersResponse{
- Result: &mesh_proto.ClustersResponse_Clusters{
- Clusters: clusters,
- },
- }, nil
-}
-
-func (s *server) resWithAddress(ctx context.Context, typ, name, mesh string) (model.ResourceWithAddress, error) {
- obj, err := registry.Global().NewObject(model.ResourceType(typ))
- if err != nil {
- return nil, err
- }
- if err := s.resManager.Get(ctx, obj, core_store.GetByKey(name, mesh)); err != nil {
- return nil, err
- }
- resourceWithAddr, ok := obj.(model.ResourceWithAddress)
- if !ok {
- return nil, errors.New("invalid resource type")
- }
- return resourceWithAddr, nil
-}
diff --git a/pkg/intercp/server/server.go b/pkg/intercp/server/server.go
deleted file mode 100644
index 59f7356..0000000
--- a/pkg/intercp/server/server.go
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package server
-
-import (
- "fmt"
- "net"
- "net/http"
- "time"
-)
-
-import (
- "google.golang.org/grpc"
- "google.golang.org/grpc/keepalive"
-)
-
-import (
- "github.com/apache/dubbo-kubernetes/pkg/config/intercp"
- "github.com/apache/dubbo-kubernetes/pkg/core"
- "github.com/apache/dubbo-kubernetes/pkg/core/runtime/component"
-)
-
-var log = core.Log.WithName("intercp-server")
-
-const (
- grpcMaxConcurrentStreams = 1000000
- grpcKeepAliveTime = 15 * time.Second
-)
-
-type InterCpServer struct {
- config intercp.InterCpServerConfig
- grpcServer *grpc.Server
- instanceId string
-}
-
-var _ component.Component = &InterCpServer{}
-
-func New(
- config intercp.InterCpServerConfig,
- instanceId string,
-) (*InterCpServer, error) {
- grpcOptions := []grpc.ServerOption{
- grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams),
- grpc.KeepaliveParams(keepalive.ServerParameters{
- Time: grpcKeepAliveTime,
- Timeout: grpcKeepAliveTime,
- }),
- grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
- MinTime: grpcKeepAliveTime,
- PermitWithoutStream: true,
- }),
- }
-
- grpcOptions = append(grpcOptions)
- grpcServer := grpc.NewServer(grpcOptions...)
-
- return &InterCpServer{
- config: config,
- grpcServer: grpcServer,
- instanceId: instanceId,
- }, nil
-}
-
-func (d *InterCpServer) Start(stop <-chan struct{}) error {
- lis, err := net.Listen("tcp", fmt.Sprintf(":%d", d.config.Port))
- if err != nil {
- return err
- }
- log := log.WithValues(
- "instanceId",
- d.instanceId,
- )
-
- errChan := make(chan error)
- go func() {
- defer close(errChan)
- if err := d.grpcServer.Serve(lis); err != nil {
- if err != http.ErrServerClosed {
- log.Error(err, "terminated with an error")
- errChan <- err
- return
- }
- }
- log.Info("terminated normally")
- }()
- log.Info("starting", "interface", "0.0.0.0", "port", d.config.Port, "tls", true)
-
- select {
- case <-stop:
- log.Info("stopping gracefully")
- d.grpcServer.GracefulStop()
- log.Info("stopped")
- return nil
- case err := <-errChan:
- return err
- }
-}
-
-func (d *InterCpServer) NeedLeaderElection() bool {
- return false
-}
-
-func (d *InterCpServer) GrpcServer() *grpc.Server {
- return d.grpcServer
-}