fmt
diff --git a/pkg/dp-server/server/server.go b/pkg/dp-server/server/server.go
index 6d39626..ad269c0 100644
--- a/pkg/dp-server/server/server.go
+++ b/pkg/dp-server/server/server.go
@@ -18,21 +18,15 @@
package server
import (
- "context"
- "crypto/tls"
"fmt"
+ "github.com/apache/dubbo-kubernetes/pkg/core/logger"
+ "google.golang.org/grpc/reflection"
+ "net"
"net/http"
- "strings"
"time"
)
import (
- "github.com/bakito/go-log-logr-adapter/adapter"
-
- http_prometheus "github.com/slok/go-http-metrics/metrics/prometheus"
- "github.com/slok/go-http-metrics/middleware"
- "github.com/slok/go-http-metrics/middleware/std"
-
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
@@ -53,11 +47,9 @@
type Filter func(writer http.ResponseWriter, request *http.Request) bool
type DpServer struct {
- config dp_server.DpServerConfig
- httpMux *http.ServeMux
- grpcServer *grpc.Server
- filter Filter
- promMiddleware middleware.Middleware
+ config dp_server.DpServerConfig
+ PlainServer *grpc.Server
+ httpMux *http.ServeMux
}
var _ component.Component = &DpServer{}
@@ -74,52 +66,40 @@
PermitWithoutStream: true,
}),
}
- grpcServer := grpc.NewServer(grpcOptions...)
- promMiddleware := middleware.New(middleware.Config{
- Recorder: http_prometheus.NewRecorder(http_prometheus.Config{
- Prefix: "dp_server",
- }),
- })
-
- return &DpServer{
- config: config,
- httpMux: http.NewServeMux(),
- grpcServer: grpcServer,
- filter: filter,
- promMiddleware: promMiddleware,
+ srv := &DpServer{
+ config: config,
+ httpMux: http.NewServeMux(),
}
+ srv.PlainServer = grpc.NewServer(grpcOptions...)
+ reflection.Register(srv.PlainServer)
+
+ return srv
}
func (d *DpServer) Start(stop <-chan struct{}) error {
- tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12} // To make gosec pass this is always set after
- server := &http.Server{
- Addr: fmt.Sprintf(":%d", d.config.Port),
- Handler: http.HandlerFunc(d.handle),
- TLSConfig: tlsConfig,
- ErrorLog: adapter.ToStd(log),
+ plainLis, err := net.Listen("tcp", fmt.Sprintf(":%d", d.config.Port))
+ if err != nil {
+ return err
}
-
- errChan := make(chan error)
+ plainErrChan := make(chan error)
go func() {
- defer close(errChan)
- if err := server.ListenAndServe(); err != nil {
- if err != http.ErrServerClosed {
- log.Error(err, "terminated with an error")
- errChan <- err
- return
- }
+ defer close(plainErrChan)
+ if err = d.PlainServer.Serve(plainLis); err != nil {
+ logger.Sugar().Error(err, "[cp-server] terminated with an error")
+ plainErrChan <- err
+ } else {
+ logger.Sugar().Info("[cp-server] terminated normally")
}
- log.Info("terminated normally")
}()
- log.Info("starting", "interface", "0.0.0.0", "port", d.config.Port, "tls", true)
-
select {
case <-stop:
log.Info("stopping")
- return server.Shutdown(context.Background())
- case err := <-errChan:
+ logger.Sugar().Info("[cp-server] stopping gracefully")
+ d.PlainServer.GracefulStop()
+ return nil
+ case err := <-plainErrChan:
return err
}
}
@@ -128,28 +108,10 @@
return false
}
-func (d *DpServer) handle(writer http.ResponseWriter, request *http.Request) {
- if !d.filter(writer, request) {
- return
- }
- // add filter function that will be in runtime, and we will implement it in kong-mesh
- if request.ProtoMajor == 2 && strings.Contains(request.Header.Get("Content-Type"), "application/grpc") {
- d.grpcServer.ServeHTTP(writer, request)
- } else {
- // we only want to measure HTTP not GRPC requests because they can mess up metrics
- // for example ADS bi-directional stream counts as one really long request
- std.Handler("", d.promMiddleware, d.httpMux).ServeHTTP(writer, request)
- }
-}
-
func (d *DpServer) HTTPMux() *http.ServeMux {
return d.httpMux
}
func (d *DpServer) GrpcServer() *grpc.Server {
- return d.grpcServer
-}
-
-func (d *DpServer) SetFilter(filter Filter) {
- d.filter = filter
+ return d.PlainServer
}
diff --git a/pkg/dubbo/components.go b/pkg/dubbo/components.go
index 0fc70f9..7d39d16 100644
--- a/pkg/dubbo/components.go
+++ b/pkg/dubbo/components.go
@@ -67,6 +67,7 @@
dubboPusher,
rt.ResourceManager(),
rt.Transactions(),
+ rt.Config().Multizone.Zone.Name,
)
mesh_proto.RegisterMetadataServiceServer(rt.DpServer().GrpcServer(), metadata)
return rt.Add(dubboPusher, serviceMapping, metadata)
diff --git a/pkg/dubbo/metadata/server.go b/pkg/dubbo/metadata/server.go
index bb8f69e..60a8b4f 100644
--- a/pkg/dubbo/metadata/server.go
+++ b/pkg/dubbo/metadata/server.go
@@ -54,9 +54,10 @@
type MetadataServer struct {
mesh_proto.MetadataServiceServer
- config dubbo.DubboConfig
- queue chan *RegisterRequest
- pusher pusher.Pusher
+ localZone string
+ config dubbo.DubboConfig
+ queue chan *RegisterRequest
+ pusher pusher.Pusher
ctx context.Context
resourceManager manager.ResourceManager
@@ -80,6 +81,7 @@
pusher pusher.Pusher,
resourceManager manager.ResourceManager,
transactions core_store.Transactions,
+ localZone string,
) *MetadataServer {
return &MetadataServer{
config: config,
@@ -88,6 +90,7 @@
ctx: ctx,
resourceManager: resourceManager,
transactions: transactions,
+ localZone: localZone,
}
}
@@ -349,6 +352,7 @@
if core_store.IsResourceNotFound(err) {
// create if not found
metadata.Spec = newMetadata
+ metadata.Spec.Zone = m.localZone
err = m.resourceManager.Create(m.ctx, metadata, core_store.CreateBy(core_model.ResourceKey{
Mesh: key.Mesh,
Name: key.Name,
diff --git a/pkg/util/xds/stats_callbacks.go b/pkg/util/xds/stats_callbacks.go
index cad899a..06a6f47 100644
--- a/pkg/util/xds/stats_callbacks.go
+++ b/pkg/util/xds/stats_callbacks.go
@@ -86,43 +86,6 @@
configsQueue: map[string]time.Time{},
}
- stats.responsesSentMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
- Name: dsType + "_responses_sent",
- Help: "Number of responses sent by the server to a client",
- }, []string{"type_url"})
- if err := metrics.Register(stats.responsesSentMetric); err != nil {
- return nil, err
- }
-
- stats.requestsReceivedMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
- Name: dsType + "_requests_received",
- Help: "Number of confirmations requests from a client",
- }, []string{"type_url", "confirmation"})
- if err := metrics.Register(stats.requestsReceivedMetric); err != nil {
- return nil, err
- }
-
- streamsActive := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
- Name: dsType + "_streams_active",
- Help: "Number of active connections between a server and a client",
- }, func() float64 {
- stats.RLock()
- defer stats.RUnlock()
- return float64(stats.streamsActive)
- })
- if err := metrics.Register(streamsActive); err != nil {
- return nil, err
- }
-
- stats.deliveryMetricName = dsType + "_delivery"
- stats.deliveryMetric = prometheus.NewSummary(prometheus.SummaryOpts{
- Name: stats.deliveryMetricName,
- Help: "Summary of config delivery including a response (ACK/NACK) from the client",
- })
- if err := metrics.Register(stats.deliveryMetric); err != nil {
- return nil, err
- }
-
return stats, nil
}
diff --git a/pkg/xds/server/components.go b/pkg/xds/server/components.go
index c69ee28..166e632 100644
--- a/pkg/xds/server/components.go
+++ b/pkg/xds/server/components.go
@@ -18,6 +18,7 @@
package server
import (
+ util_xds "github.com/apache/dubbo-kubernetes/pkg/util/xds"
"github.com/pkg/errors"
)
@@ -42,6 +43,7 @@
)
func RegisterXDS(rt core_runtime.Runtime) error {
+ statsCallbacks, err := util_xds.NewStatsCallbacks(nil, "xds")
claCache, err := cla.NewCache(rt.Config().Store.Cache.ExpirationTime.Duration)
if err != nil {
return err
@@ -51,7 +53,7 @@
CLACache: claCache,
Zone: "",
}
- if err := v3.RegisterXDS(nil, envoyCpCtx, rt); err != nil {
+ if err := v3.RegisterXDS(statsCallbacks, envoyCpCtx, rt); err != nil {
return errors.Wrap(err, "could not register V3 XDS")
}
return nil
diff --git a/test/app/provider/deployment.yaml b/test/app/provider/deployment.yaml
index 52d34ab..c2bc771 100644
--- a/test/app/provider/deployment.yaml
+++ b/test/app/provider/deployment.yaml
@@ -26,7 +26,7 @@
template:
metadata:
annotations:
- dubbo.io/xds-enable: enabled
+ dubbo.io/ingress: enabled
labels:
app: dubbo-samples-apiserver-provider
spec:
diff --git a/tools/xds-client/main.go b/tools/xds-client/main.go
index 47e9aef..1a86f9c 100644
--- a/tools/xds-client/main.go
+++ b/tools/xds-client/main.go
@@ -52,7 +52,7 @@
outbounds int
rampUpPeriod time.Duration
}{
- xdsServerAddress: "grpcs://localhost:5678",
+ xdsServerAddress: "grpc://localhost:5678",
dps: 100,
services: 50,
inbounds: 1,
@@ -100,7 +100,7 @@
}
dp := &unversioned.Resource{
- Meta: rest_v1alpha1.ResourceMeta{Mesh: "default", Name: fmt.Sprintf("dataplane-%d", i), Type: "Dataplane"},
+ Meta: rest_v1alpha1.ResourceMeta{Mesh: "default", Name: fmt.Sprintf("dataplane-%d.dubbo-system", i), Type: "Dataplane"},
Spec: dpSpec,
}