blob: 492d0f7520a89d8d66d251cef20354a9240bdb2e [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package endpoint
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"strconv"
"strings"
"time"
)
import (
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/admin"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
"google.golang.org/grpc/health"
grpcHealth "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/xds"
"k8s.io/utils/env"
)
import (
"github.com/apache/dubbo-go-pixiu/pkg/istio-agent/grpcxds"
"github.com/apache/dubbo-go-pixiu/pkg/test/echo"
"github.com/apache/dubbo-go-pixiu/pkg/test/echo/common"
"github.com/apache/dubbo-go-pixiu/pkg/test/echo/proto"
"github.com/apache/dubbo-go-pixiu/pkg/test/echo/server/forwarder"
"github.com/apache/dubbo-go-pixiu/pkg/test/util/retry"
)
var _ Instance = &grpcInstance{}
// grpcServer is the intersection of used methods for grpc.Server and xds.GRPCServer
type grpcServer interface {
reflection.GRPCServer
Serve(listener net.Listener) error
Stop()
}
type grpcInstance struct {
Config
server grpcServer
cleanups []func()
}
func newGRPC(config Config) Instance {
return &grpcInstance{
Config: config,
}
}
func (s *grpcInstance) GetConfig() Config {
return s.Config
}
func (s *grpcInstance) newServer(opts ...grpc.ServerOption) grpcServer {
if s.Port.XDSServer {
if len(s.Port.XDSTestBootstrap) > 0 {
opts = append(opts, xds.BootstrapContentsForTesting(s.Port.XDSTestBootstrap))
}
epLog.Infof("Using xDS for serverside gRPC on %d", s.Port.Port)
return xds.NewGRPCServer(opts...)
}
return grpc.NewServer(opts...)
}
func (s *grpcInstance) Start(onReady OnReadyFunc) error {
// Listen on the given port and update the port if it changed from what was passed in.
listener, p, err := listenOnAddress(s.ListenerIP, s.Port.Port)
if err != nil {
return err
}
// Store the actual listening port back to the argument.
s.Port.Port = p
var opts []grpc.ServerOption
if s.Port.TLS {
epLog.Infof("Listening GRPC (over TLS) on %v", p)
// Create the TLS credentials
creds, errCreds := credentials.NewServerTLSFromFile(s.TLSCert, s.TLSKey)
if errCreds != nil {
epLog.Errorf("could not load TLS keys: %s", errCreds)
}
opts = append(opts, grpc.Creds(creds))
} else if s.Port.XDSServer {
epLog.Infof("Listening GRPC (over xDS-configured mTLS) on %v", p)
creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{
FallbackCreds: insecure.NewCredentials(),
})
if err != nil {
return err
}
opts = append(opts, grpc.Creds(creds))
} else {
epLog.Infof("Listening GRPC on %v", p)
}
s.server = s.newServer(opts...)
// add the standard grpc health check
healthServer := health.NewServer()
grpcHealth.RegisterHealthServer(s.server, healthServer)
proto.RegisterEchoTestServiceServer(s.server, &EchoGrpcHandler{
Config: s.Config,
})
reflection.Register(s.server)
if val, _ := env.GetBool("EXPOSE_GRPC_ADMIN", false); val {
cleanup, err := admin.Register(s.server)
if err != nil {
return err
}
s.cleanups = append(s.cleanups, cleanup)
}
// Start serving GRPC traffic.
go func() {
err := s.server.Serve(listener)
epLog.Warnf("Port %d listener terminated with error: %v", p, err)
}()
// Notify the WaitGroup once the port has transitioned to ready.
go s.awaitReady(func() {
healthServer.SetServingStatus("", grpcHealth.HealthCheckResponse_SERVING)
onReady()
}, listener)
return nil
}
func (s *grpcInstance) awaitReady(onReady OnReadyFunc, listener net.Listener) {
defer onReady()
err := retry.UntilSuccess(func() error {
cert, key, ca, err := s.certsFromBootstrapForReady()
if err != nil {
return err
}
req := &proto.ForwardEchoRequest{
Url: "grpc://" + listener.Addr().String(),
Message: "hello",
TimeoutMicros: common.DurationToMicros(readyInterval),
}
if s.Port.XDSReadinessTLS {
// TODO: using the servers key/cert is not always valid, it may not be allowed to make requests to itself
req.CertFile = cert
req.KeyFile = key
req.CaCertFile = ca
req.InsecureSkipVerify = true
}
f, err := forwarder.New(forwarder.Config{
XDSTestBootstrap: s.Port.XDSTestBootstrap,
Request: req,
})
defer func() {
_ = f.Close()
}()
if err != nil {
return err
}
_, err = f.Run(context.Background())
return err
}, retry.Timeout(readyTimeout), retry.Delay(readyInterval))
if err != nil {
epLog.Errorf("readiness failed for GRPC endpoint %s: %v", listener.Addr().String(), err)
} else {
epLog.Infof("ready for GRPC endpoint %s", listener.Addr().String())
}
}
// TODO (hack) we have to send certs OR use xds:///fqdn. We don't know our own fqdn, and even if we did
// we could send traffic to another instance. Instead we look into gRPC internals to authenticate with ourself.
func (s *grpcInstance) certsFromBootstrapForReady() (cert string, key string, ca string, err error) {
if !s.Port.XDSServer {
return
}
var bootstrapData []byte
if data := s.Port.XDSTestBootstrap; len(data) > 0 {
bootstrapData = data
} else if path := os.Getenv("GRPC_XDS_BOOTSTRAP"); len(path) > 0 {
bootstrapData, err = os.ReadFile(path)
} else if data := os.Getenv("GRPC_XDS_BOOTSTRAP_CONFIG"); len(data) > 0 {
bootstrapData = []byte(data)
}
var bootstrap grpcxds.Bootstrap
if uerr := json.Unmarshal(bootstrapData, &bootstrap); uerr != nil {
err = uerr
return
}
certs := bootstrap.FileWatcherProvider()
if certs == nil {
err = fmt.Errorf("no certs found in bootstrap")
return
}
cert = certs.CertificateFile
key = certs.PrivateKeyFile
ca = certs.CACertificateFile
return
}
func (s *grpcInstance) Close() error {
if s.server != nil {
s.server.Stop()
}
for _, cleanup := range s.cleanups {
cleanup()
}
return nil
}
type EchoGrpcHandler struct {
proto.UnimplementedEchoTestServiceServer
Config
}
func (h *EchoGrpcHandler) Echo(ctx context.Context, req *proto.EchoRequest) (*proto.EchoResponse, error) {
defer common.Metrics.GrpcRequests.With(common.PortLabel.Value(strconv.Itoa(h.Port.Port))).Increment()
body := bytes.Buffer{}
md, ok := metadata.FromIncomingContext(ctx)
if ok {
for key, values := range md {
if strings.HasSuffix(key, "-bin") {
// Skip binary headers.
continue
}
field := key
if key == ":authority" {
for _, value := range values {
writeField(&body, echo.HostField, value)
}
}
for _, value := range values {
writeRequestHeader(&body, field, value)
}
}
}
id := uuid.New()
epLog.WithLabels("message", req.GetMessage(), "headers", md, "id", id).Infof("GRPC Request")
portNumber := 0
if h.Port != nil {
portNumber = h.Port.Port
}
ip := "0.0.0.0"
if peerInfo, ok := peer.FromContext(ctx); ok {
ip, _, _ = net.SplitHostPort(peerInfo.Addr.String())
}
writeField(&body, echo.StatusCodeField, strconv.Itoa(http.StatusOK))
writeField(&body, echo.ServiceVersionField, h.Version)
writeField(&body, echo.ServicePortField, strconv.Itoa(portNumber))
writeField(&body, echo.ClusterField, h.Cluster)
writeField(&body, echo.IPField, ip)
writeField(&body, echo.IstioVersionField, h.IstioVersion)
writeField(&body, echo.ProtocolField, "GRPC")
writeField(&body, "Echo", req.GetMessage())
if hostname, err := os.Hostname(); err == nil {
writeField(&body, echo.HostnameField, hostname)
}
epLog.WithLabels("id", id).Infof("GRPC Response")
return &proto.EchoResponse{Message: body.String()}, nil
}
func (h *EchoGrpcHandler) ForwardEcho(ctx context.Context, req *proto.ForwardEchoRequest) (*proto.ForwardEchoResponse, error) {
id := uuid.New()
l := epLog.WithLabels("url", req.Url, "id", id)
l.Infof("ForwardEcho request")
t0 := time.Now()
instance, err := forwarder.New(forwarder.Config{
Request: req,
})
if err != nil {
return nil, err
}
defer func() { _ = instance.Close() }()
ret, err := instance.Run(ctx)
if err == nil {
l.WithLabels("latency", time.Since(t0)).Infof("ForwardEcho response complete: %v", ret.GetOutput())
} else {
l.WithLabels("latency", time.Since(t0)).Infof("ForwardEcho response failed: %v", err)
}
return ret, err
}