fix(cp-server): run h2c server when tls files are not set
diff --git a/app/dubboctl/cmd/proxy.go b/app/dubboctl/cmd/proxy.go
index f901ee1..31fdf76 100644
--- a/app/dubboctl/cmd/proxy.go
+++ b/app/dubboctl/cmd/proxy.go
@@ -20,14 +20,6 @@
 import (
 	"context"
 	"fmt"
-<<<<<<< HEAD
-	"github.com/apache/dubbo-kubernetes/pkg/core"
-	"github.com/apache/dubbo-kubernetes/pkg/core/logger"
-	"github.com/apache/dubbo-kubernetes/pkg/util/proto"
-	"github.com/pkg/errors"
-	"go.uber.org/zap/zapcore"
-=======
->>>>>>> b165c47df48705310408955be94d62e3675c974b
 	"io"
 	"os"
 	"path/filepath"
@@ -48,6 +40,7 @@
 	"github.com/apache/dubbo-kubernetes/pkg/config/app/dubboctl"
 	"github.com/apache/dubbo-kubernetes/pkg/core"
 	dubbo_cmd "github.com/apache/dubbo-kubernetes/pkg/core/cmd"
+	"github.com/apache/dubbo-kubernetes/pkg/core/logger"
 	"github.com/apache/dubbo-kubernetes/pkg/core/resources/apis/mesh"
 	"github.com/apache/dubbo-kubernetes/pkg/core/resources/model"
 	"github.com/apache/dubbo-kubernetes/pkg/core/resources/model/rest"
diff --git a/pkg/config/dp-server/config.go b/pkg/config/dp-server/config.go
index e0d1688..1256b61 100644
--- a/pkg/config/dp-server/config.go
+++ b/pkg/config/dp-server/config.go
@@ -48,8 +48,10 @@
 	// were failing to reconnect (we observed this in Projected Service Account
 	// Tokens e2e tests, which started flaking a lot after introducing explicit
 	// 1s timeout)
-	// TlsCertFile defines a path to a file with PEM-encoded TLS cert. If empty, autoconfigured from general.tlsCertFile
-	TlsCertFile       string                `json:"tlsCertFile" envconfig:"dubbo_dp_server_tls_cert_file"`
+	// TlsCertFile defines a path to a file with PEM-encoded TLS cert. If empty, start the plain HTTP/2 server (h2c).
+	TlsCertFile string `json:"tlsCertFile" envconfig:"dubbo_dp_server_tls_cert_file"`
+	// TlsKeyFile defines a path to a file with PEM-encoded TLS key. If empty, start the plain HTTP/2 server (h2c).
+	TlsKeyFile        string                `json:"tlsKeyFile" envconfig:"kuma_diagnostics_tls_key_file"`
 	ReadHeaderTimeout config_types.Duration `json:"readHeaderTimeout" envconfig:"dubbo_dp_server_read_header_timeout"`
 	// Port of the DP Server
 	Port int `json:"port" envconfig:"dubbo_dp_server_port"`
diff --git a/pkg/dp-server/server/server.go b/pkg/dp-server/server/server.go
index ad269c0..15dd47d 100644
--- a/pkg/dp-server/server/server.go
+++ b/pkg/dp-server/server/server.go
@@ -18,15 +18,24 @@
 package server
 
 import (
+	"context"
+	"crypto/tls"
 	"fmt"
-	"github.com/apache/dubbo-kubernetes/pkg/core/logger"
-	"google.golang.org/grpc/reflection"
-	"net"
 	"net/http"
+	"strings"
 	"time"
 )
 
 import (
+	"github.com/bakito/go-log-logr-adapter/adapter"
+
+	http_prometheus "github.com/slok/go-http-metrics/metrics/prometheus"
+	"github.com/slok/go-http-metrics/middleware"
+	"github.com/slok/go-http-metrics/middleware/std"
+
+	"golang.org/x/net/http2"
+	"golang.org/x/net/http2/h2c"
+
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/keepalive"
 )
@@ -47,9 +56,11 @@
 type Filter func(writer http.ResponseWriter, request *http.Request) bool
 
 type DpServer struct {
-	config      dp_server.DpServerConfig
-	PlainServer *grpc.Server
-	httpMux     *http.ServeMux
+	config         dp_server.DpServerConfig
+	httpMux        *http.ServeMux
+	grpcServer     *grpc.Server
+	filter         Filter
+	promMiddleware middleware.Middleware
 }
 
 var _ component.Component = &DpServer{}
@@ -66,40 +77,61 @@
 			PermitWithoutStream: true,
 		}),
 	}
+	grpcServer := grpc.NewServer(grpcOptions...)
 
-	srv := &DpServer{
-		config:  config,
-		httpMux: http.NewServeMux(),
+	promMiddleware := middleware.New(middleware.Config{
+		Recorder: http_prometheus.NewRecorder(http_prometheus.Config{
+			Prefix: "dp_server",
+		}),
+	})
+
+	return &DpServer{
+		config:         config,
+		httpMux:        http.NewServeMux(),
+		grpcServer:     grpcServer,
+		filter:         filter,
+		promMiddleware: promMiddleware,
 	}
-	srv.PlainServer = grpc.NewServer(grpcOptions...)
-	reflection.Register(srv.PlainServer)
-
-	return srv
 }
 
 func (d *DpServer) Start(stop <-chan struct{}) error {
-	plainLis, err := net.Listen("tcp", fmt.Sprintf(":%d", d.config.Port))
-	if err != nil {
-		return err
+	tlsConfig := &tls.Config{MinVersion: tls.VersionTLS12} // To make gosec pass this is always set after
+	server := &http.Server{
+		Addr:      fmt.Sprintf(":%d", d.config.Port),
+		Handler:   http.HandlerFunc(d.handle),
+		TLSConfig: tlsConfig,
+		ErrorLog:  adapter.ToStd(log),
 	}
-	plainErrChan := make(chan error)
+
+	errChan := make(chan error)
 
 	go func() {
-		defer close(plainErrChan)
-		if err = d.PlainServer.Serve(plainLis); err != nil {
-			logger.Sugar().Error(err, "[cp-server] terminated with an error")
-			plainErrChan <- err
+		defer close(errChan)
+		var err error
+		if d.config.TlsCertFile == "" || d.config.TlsKeyFile == "" {
+			// h2c is used for HTTP/2 cleartext upgrade
+			// so that we can normally run xds server on gRPC
+			server.Handler = h2c.NewHandler(server.Handler, &http2.Server{})
+			err = server.ListenAndServe()
 		} else {
-			logger.Sugar().Info("[cp-server] terminated normally")
+			err = server.ListenAndServeTLS(d.config.TlsCertFile, d.config.TlsKeyFile)
 		}
+		if err != nil {
+			if err != http.ErrServerClosed {
+				log.Error(err, "terminated with an error")
+				errChan <- err
+				return
+			}
+		}
+		log.Info("terminated normally")
 	}()
+	log.Info("starting", "interface", "0.0.0.0", "port", d.config.Port, "tls", true)
+
 	select {
 	case <-stop:
 		log.Info("stopping")
-		logger.Sugar().Info("[cp-server] stopping gracefully")
-		d.PlainServer.GracefulStop()
-		return nil
-	case err := <-plainErrChan:
+		return server.Shutdown(context.Background())
+	case err := <-errChan:
 		return err
 	}
 }
@@ -108,10 +140,28 @@
 	return false
 }
 
+func (d *DpServer) handle(writer http.ResponseWriter, request *http.Request) {
+	if !d.filter(writer, request) {
+		return
+	}
+	// add filter function that will be in runtime, and we will implement it in kong-mesh
+	if request.ProtoMajor == 2 && strings.Contains(request.Header.Get("Content-Type"), "application/grpc") {
+		d.grpcServer.ServeHTTP(writer, request)
+	} else {
+		// we only want to measure HTTP not GRPC requests because they can mess up metrics
+		// for example ADS bi-directional stream counts as one really long request
+		std.Handler("", d.promMiddleware, d.httpMux).ServeHTTP(writer, request)
+	}
+}
+
 func (d *DpServer) HTTPMux() *http.ServeMux {
 	return d.httpMux
 }
 
 func (d *DpServer) GrpcServer() *grpc.Server {
-	return d.PlainServer
+	return d.grpcServer
+}
+
+func (d *DpServer) SetFilter(filter Filter) {
+	d.filter = filter
 }