Merge pull request #259 from yin1999/run-h2c-server
fix(cp-server): run h2c server when tls files are not set
diff --git a/app/dubboctl/cmd/proxy.go b/app/dubboctl/cmd/proxy.go
index f901ee1..31fdf76 100644
--- a/app/dubboctl/cmd/proxy.go
+++ b/app/dubboctl/cmd/proxy.go
@@ -20,14 +20,6 @@
import (
"context"
"fmt"
-<<<<<<< HEAD
- "github.com/apache/dubbo-kubernetes/pkg/core"
- "github.com/apache/dubbo-kubernetes/pkg/core/logger"
- "github.com/apache/dubbo-kubernetes/pkg/util/proto"
- "github.com/pkg/errors"
- "go.uber.org/zap/zapcore"
-=======
->>>>>>> b165c47df48705310408955be94d62e3675c974b
"io"
"os"
"path/filepath"
@@ -48,6 +40,7 @@
"github.com/apache/dubbo-kubernetes/pkg/config/app/dubboctl"
"github.com/apache/dubbo-kubernetes/pkg/core"
dubbo_cmd "github.com/apache/dubbo-kubernetes/pkg/core/cmd"
+ "github.com/apache/dubbo-kubernetes/pkg/core/logger"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
"github.com/apache/dubbo-kubernetes/pkg/core/resources/model/rest"
diff --git a/pkg/config/dp-server/config.go b/pkg/config/dp-server/config.go
index e0d1688..1256b61 100644
--- a/pkg/config/dp-server/config.go
+++ b/pkg/config/dp-server/config.go
@@ -48,8 +48,10 @@
// were failing to reconnect (we observed this in Projected Service Account
// Tokens e2e tests, which started flaking a lot after introducing explicit
// 1s timeout)
- // TlsCertFile defines a path to a file with PEM-encoded TLS cert. If empty, autoconfigured from general.tlsCertFile
- TlsCertFile string `json:"tlsCertFile" envconfig:"dubbo_dp_server_tls_cert_file"`
+ // TlsCertFile defines a path to a file with PEM-encoded TLS cert. If empty, start the plain HTTP/2 server (h2c).
+ TlsCertFile string `json:"tlsCertFile" envconfig:"dubbo_dp_server_tls_cert_file"`
+ // TlsKeyFile defines a path to a file with PEM-encoded TLS key. If empty, start the plain HTTP/2 server (h2c).
+ TlsKeyFile string `json:"tlsKeyFile" envconfig:"kuma_diagnostics_tls_key_file"`
ReadHeaderTimeout config_types.Duration `json:"readHeaderTimeout" envconfig:"dubbo_dp_server_read_header_timeout"`
// Port of the DP Server
Port int `json:"port" envconfig:"dubbo_dp_server_port"`
diff --git a/pkg/dp-server/server/server.go b/pkg/dp-server/server/server.go
index ad269c0..15dd47d 100644
--- a/pkg/dp-server/server/server.go
+++ b/pkg/dp-server/server/server.go
@@ -18,15 +18,24 @@
package server
import (
+ "context"
+ "crypto/tls"
"fmt"
- "github.com/apache/dubbo-kubernetes/pkg/core/logger"
- "google.golang.org/grpc/reflection"
- "net"
"net/http"
+ "strings"
"time"
)
import (
+ "github.com/bakito/go-log-logr-adapter/adapter"
+
+ http_prometheus "github.com/slok/go-http-metrics/metrics/prometheus"
+ "github.com/slok/go-http-metrics/middleware"
+ "github.com/slok/go-http-metrics/middleware/std"
+
+ "golang.org/x/net/http2"
+ "golang.org/x/net/http2/h2c"
+
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
@@ -47,9 +56,11 @@
type Filter func(writer http.ResponseWriter, request *http.Request) bool
type DpServer struct {
- config dp_server.DpServerConfig
- PlainServer *grpc.Server
- httpMux *http.ServeMux
+ config dp_server.DpServerConfig
+ httpMux *http.ServeMux
+ grpcServer *grpc.Server
+ filter Filter
+ promMiddleware middleware.Middleware
}
var _ component.Component = &DpServer{}
@@ -66,40 +77,61 @@
PermitWithoutStream: true,
}),
}
+ grpcServer := grpc.NewServer(grpcOptions...)
- srv := &DpServer{
- config: config,
- httpMux: http.NewServeMux(),
+ promMiddleware := middleware.New(middleware.Config{
+ Recorder: http_prometheus.NewRecorder(http_prometheus.Config{
+ Prefix: "dp_server",
+ }),
+ })
+
+ return &DpServer{
+ config: config,
+ httpMux: http.NewServeMux(),
+ grpcServer: grpcServer,
+ filter: filter,
+ promMiddleware: promMiddleware,
}
- srv.PlainServer = grpc.NewServer(grpcOptions...)
- reflection.Register(srv.PlainServer)
-
- return srv
}
func (d *DpServer) Start(stop <-chan struct{}) error {
- plainLis, err := net.Listen("tcp", fmt.Sprintf(":%d", d.config.Port))
- if err != nil {
- return err
+ tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12} // To make gosec pass this is always set after
+ server := &http.Server{
+ Addr: fmt.Sprintf(":%d", d.config.Port),
+ Handler: http.HandlerFunc(d.handle),
+ TLSConfig: tlsConfig,
+ ErrorLog: adapter.ToStd(log),
}
- plainErrChan := make(chan error)
+
+ errChan := make(chan error)
go func() {
- defer close(plainErrChan)
- if err = d.PlainServer.Serve(plainLis); err != nil {
- logger.Sugar().Error(err, "[cp-server] terminated with an error")
- plainErrChan <- err
+ defer close(errChan)
+ var err error
+ if d.config.TlsCertFile == "" || d.config.TlsKeyFile == "" {
+ // h2c is used for HTTP/2 cleartext upgrade
+ // so that we can normally run xds server on gRPC
+ server.Handler = h2c.NewHandler(server.Handler, &http2.Server{})
+ err = server.ListenAndServe()
} else {
- logger.Sugar().Info("[cp-server] terminated normally")
+ err = server.ListenAndServeTLS(d.config.TlsCertFile, d.config.TlsKeyFile)
}
+ if err != nil {
+ if err != http.ErrServerClosed {
+ log.Error(err, "terminated with an error")
+ errChan <- err
+ return
+ }
+ }
+ log.Info("terminated normally")
}()
+ log.Info("starting", "interface", "0.0.0.0", "port", d.config.Port, "tls", true)
+
select {
case <-stop:
log.Info("stopping")
- logger.Sugar().Info("[cp-server] stopping gracefully")
- d.PlainServer.GracefulStop()
- return nil
- case err := <-plainErrChan:
+ return server.Shutdown(context.Background())
+ case err := <-errChan:
return err
}
}
@@ -108,10 +140,28 @@
return false
}
+func (d *DpServer) handle(writer http.ResponseWriter, request *http.Request) {
+ if !d.filter(writer, request) {
+ return
+ }
+ // add filter function that will be in runtime, and we will implement it in kong-mesh
+ if request.ProtoMajor == 2 && strings.Contains(request.Header.Get("Content-Type"), "application/grpc") {
+ d.grpcServer.ServeHTTP(writer, request)
+ } else {
+ // we only want to measure HTTP not GRPC requests because they can mess up metrics
+ // for example ADS bi-directional stream counts as one really long request
+ std.Handler("", d.promMiddleware, d.httpMux).ServeHTTP(writer, request)
+ }
+}
+
func (d *DpServer) HTTPMux() *http.ServeMux {
return d.httpMux
}
func (d *DpServer) GrpcServer() *grpc.Server {
- return d.PlainServer
+ return d.grpcServer
+}
+
+func (d *DpServer) SetFilter(filter Filter) {
+ d.filter = filter
}