| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| //go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative proto/helloworld.proto |
| //go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative proto/import.proto |
| //go:generate protoc --include_imports --descriptor_set_out=proto.pb --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative proto/src.proto |
| //go:generate protoc --descriptor_set_out=echo.pb --include_imports --proto_path=$PWD/proto echo.proto |
| //go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative proto/echo.proto |
| |
| // Package main implements a server for Greeter service. |
| package main |
| |
| import ( |
| "context" |
| "crypto/tls" |
| "crypto/x509" |
| "flag" |
| "fmt" |
| "io" |
| "log" |
| "net" |
| "net/http" |
| "os" |
| "os/signal" |
| "strings" |
| "syscall" |
| "time" |
| |
| "golang.org/x/net/http2" |
| "golang.org/x/net/http2/h2c" |
| |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/credentials" |
| "google.golang.org/grpc/reflection" |
| "google.golang.org/grpc/status" |
| |
| pb "github.com/api7/grpc_server_example/proto" |
| ) |
| |
| var ( |
| grpcAddr = ":10051" |
| grpcsAddr = ":10052" |
| grpcsMtlsAddr string |
| grpcHTTPAddr string |
| |
| crtFilePath = "../t/cert/apisix.crt" |
| keyFilePath = "../t/cert/apisix.key" |
| caFilePath string |
| ) |
| |
| func init() { |
| flag.StringVar(&grpcAddr, "grpc-address", grpcAddr, "address for grpc") |
| flag.StringVar(&grpcsAddr, "grpcs-address", grpcsAddr, "address for grpcs") |
| flag.StringVar(&grpcsMtlsAddr, "grpcs-mtls-address", grpcsMtlsAddr, "address for grpcs in mTLS") |
| flag.StringVar(&grpcHTTPAddr, "grpc-http-address", grpcHTTPAddr, "addresses for http and grpc services at the same time") |
| flag.StringVar(&crtFilePath, "crt", crtFilePath, "path to certificate") |
| flag.StringVar(&keyFilePath, "key", keyFilePath, "path to key") |
| flag.StringVar(&caFilePath, "ca", caFilePath, "path to ca") |
| } |
| |
| // server is used to implement helloworld.GreeterServer. |
| type server struct { |
| // Embed the unimplemented server |
| pb.UnimplementedGreeterServer |
| pb.UnimplementedTestImportServer |
| pb.UnimplementedEchoServer |
| } |
| |
| // SayHello implements helloworld.GreeterServer |
| func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { |
| log.Printf("Received: %v", in.Name) |
| log.Printf("Enum Gender: %v", in.GetGender()) |
| msg := "Hello " + in.Name |
| |
| person := in.GetPerson() |
| if person != nil { |
| if person.GetName() != "" { |
| msg += fmt.Sprintf(", name: %v", person.GetName()) |
| } |
| if person.GetAge() != 0 { |
| msg += fmt.Sprintf(", age: %v", person.GetAge()) |
| } |
| } |
| |
| return &pb.HelloReply{ |
| Message: msg, |
| Items: in.GetItems(), |
| Gender: in.GetGender(), |
| }, nil |
| } |
| |
| // GetErrResp implements helloworld.GreeterServer |
| func (s *server) GetErrResp(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { |
| st := status.New(codes.Unavailable, "Out of service") |
| st, err := st.WithDetails(&pb.ErrorDetail{ |
| Code: 1, |
| Message: "The server is out of service", |
| Type: "service", |
| }) |
| if err != nil { |
| panic(fmt.Sprintf("Unexpected error attaching metadata: %v", err)) |
| } |
| |
| return nil, st.Err() |
| } |
| |
| func (s *server) SayHelloAfterDelay(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { |
| select { |
| case <-time.After(1 * time.Second): |
| fmt.Println("overslept") |
| case <-ctx.Done(): |
| errStr := ctx.Err().Error() |
| if ctx.Err() == context.DeadlineExceeded { |
| return nil, status.Error(codes.DeadlineExceeded, errStr) |
| } |
| } |
| |
| time.Sleep(1 * time.Second) |
| |
| log.Printf("Received: %v", in.Name) |
| |
| return &pb.HelloReply{Message: "Hello delay " + in.Name}, nil |
| } |
| |
| func (s *server) Plus(ctx context.Context, in *pb.PlusRequest) (*pb.PlusReply, error) { |
| log.Printf("Received: %v %v", in.A, in.B) |
| return &pb.PlusReply{Result: in.A + in.B}, nil |
| } |
| |
| func (s *server) EchoStruct(ctx context.Context, in *pb.StructRequest) (*pb.StructReply, error) { |
| log.Printf("Received: %+v", in) |
| |
| return &pb.StructReply{ |
| Data: in.Data, |
| }, nil |
| } |
| |
| // SayHelloServerStream streams HelloReply back to the client. |
| func (s *server) SayHelloServerStream(req *pb.HelloRequest, stream pb.Greeter_SayHelloServerStreamServer) error { |
| log.Printf("Received server side stream req: %v\n", req) |
| |
| // Say Hello 5 times. |
| for i := 0; i < 5; i++ { |
| if err := stream.Send(&pb.HelloReply{ |
| Message: fmt.Sprintf("Hello %s", req.Name), |
| }); err != nil { |
| return status.Errorf(codes.Unavailable, "Unable to stream request back to client: %v", err) |
| } |
| } |
| return nil |
| } |
| |
| // SayHelloClientStream receives a stream of HelloRequest from a client. |
| func (s *server) SayHelloClientStream(stream pb.Greeter_SayHelloClientStreamServer) error { |
| log.Println("SayHello client side streaming has been initiated.") |
| cache := "" |
| for { |
| req, err := stream.Recv() |
| if err == io.EOF { |
| return stream.SendAndClose(&pb.HelloReply{Message: cache}) |
| } |
| if err != nil { |
| return status.Errorf(codes.Unavailable, "Failed to read client stream: %v", err) |
| } |
| cache = fmt.Sprintf("%sHello %s!", cache, req.Name) |
| } |
| } |
| |
| // SayHelloBidirectionalStream establishes a bidirectional stream with the client. |
| func (s *server) SayHelloBidirectionalStream(stream pb.Greeter_SayHelloBidirectionalStreamServer) error { |
| log.Println("SayHello bidirectional streaming has been initiated.") |
| |
| for { |
| req, err := stream.Recv() |
| if err == io.EOF { |
| return stream.Send(&pb.HelloReply{Message: "stream ended"}) |
| } |
| if err != nil { |
| return status.Errorf(codes.Unavailable, "Failed to read client stream: %v", err) |
| } |
| |
| // A small 0.5 sec sleep |
| time.Sleep(500 * time.Millisecond) |
| |
| if err := stream.Send(&pb.HelloReply{Message: fmt.Sprintf("Hello %s", req.Name)}); err != nil { |
| return status.Errorf(codes.Unknown, "Failed to stream response back to client: %v", err) |
| } |
| } |
| } |
| |
| // SayMultipleHello implements helloworld.GreeterServer |
| func (s *server) SayMultipleHello(ctx context.Context, in *pb.MultipleHelloRequest) (*pb.MultipleHelloReply, error) { |
| log.Printf("Received: %v", in.Name) |
| log.Printf("Enum Gender: %v", in.GetGenders()) |
| msg := "Hello " + in.Name |
| |
| persons := in.GetPersons() |
| if persons != nil { |
| for _, person := range persons { |
| if person.GetName() != "" { |
| msg += fmt.Sprintf(", name: %v", person.GetName()) |
| } |
| if person.GetAge() != 0 { |
| msg += fmt.Sprintf(", age: %v", person.GetAge()) |
| } |
| } |
| } |
| |
| return &pb.MultipleHelloReply{ |
| Message: msg, |
| Items: in.GetItems(), |
| Genders: in.GetGenders(), |
| }, nil |
| } |
| |
| func (s *server) Run(ctx context.Context, in *pb.Request) (*pb.Response, error) { |
| return &pb.Response{Body: in.User.Name + " " + in.Body}, nil |
| } |
| |
| func gRPCAndHTTPFunc(grpcServer *grpc.Server) http.Handler { |
| return h2c.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
| mux := http.NewServeMux() |
| mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { |
| w.Write([]byte("hello http")) |
| }) |
| |
| if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") { |
| grpcServer.ServeHTTP(w, r) |
| } else { |
| mux.ServeHTTP(w, r) |
| } |
| }), &http2.Server{}) |
| } |
| |
| func main() { |
| flag.Parse() |
| |
| go func() { |
| lis, err := net.Listen("tcp", grpcAddr) |
| if err != nil { |
| log.Fatalf("failed to listen: %v", err) |
| } |
| s := grpc.NewServer() |
| |
| reflection.Register(s) |
| pb.RegisterGreeterServer(s, &server{}) |
| pb.RegisterTestImportServer(s, &server{}) |
| pb.RegisterEchoServer(s, &server{}) |
| |
| if err := s.Serve(lis); err != nil { |
| log.Fatalf("failed to serve: %v", err) |
| } |
| }() |
| |
| go func() { |
| lis, err := net.Listen("tcp", grpcsAddr) |
| if err != nil { |
| log.Fatalf("failed to listen: %v", err) |
| } |
| |
| c, err := credentials.NewServerTLSFromFile(crtFilePath, keyFilePath) |
| if err != nil { |
| log.Fatalf("credentials.NewServerTLSFromFile err: %v", err) |
| } |
| s := grpc.NewServer(grpc.Creds(c)) |
| reflection.Register(s) |
| pb.RegisterGreeterServer(s, &server{}) |
| if err := s.Serve(lis); err != nil { |
| log.Fatalf("failed to serve: %v", err) |
| } |
| }() |
| |
| if grpcHTTPAddr != "" { |
| go func() { |
| lis, err := net.Listen("tcp", grpcHTTPAddr) |
| if err != nil { |
| log.Fatalf("failed to listen: %v", err) |
| } |
| s := grpc.NewServer() |
| |
| reflection.Register(s) |
| pb.RegisterGreeterServer(s, &server{}) |
| pb.RegisterTestImportServer(s, &server{}) |
| |
| if err := http.Serve(lis, gRPCAndHTTPFunc(s)); err != nil { |
| log.Fatalf("failed to serve grpc: %v", err) |
| } |
| }() |
| } |
| |
| if grpcsMtlsAddr != "" { |
| go func() { |
| lis, err := net.Listen("tcp", grpcsMtlsAddr) |
| if err != nil { |
| log.Fatalf("failed to listen: %v", err) |
| } |
| |
| certificate, err := tls.LoadX509KeyPair(crtFilePath, keyFilePath) |
| if err != nil { |
| log.Fatalf("could not load server key pair: %s", err) |
| } |
| |
| certPool := x509.NewCertPool() |
| ca, err := os.ReadFile(caFilePath) |
| if err != nil { |
| log.Fatalf("could not read ca certificate: %s", err) |
| } |
| |
| if ok := certPool.AppendCertsFromPEM(ca); !ok { |
| log.Fatalf("failed to append client certs") |
| } |
| |
| c := credentials.NewTLS(&tls.Config{ |
| ClientAuth: tls.RequireAndVerifyClientCert, |
| Certificates: []tls.Certificate{certificate}, |
| ClientCAs: certPool, |
| }) |
| s := grpc.NewServer(grpc.Creds(c)) |
| reflection.Register(s) |
| pb.RegisterGreeterServer(s, &server{}) |
| if err := s.Serve(lis); err != nil { |
| log.Fatalf("failed to serve: %v", err) |
| } |
| }() |
| } |
| |
| signals := make(chan os.Signal) |
| signal.Notify(signals, os.Interrupt, syscall.SIGTERM) |
| sig := <-signals |
| log.Printf("get signal %s, exit\n", sig.String()) |
| } |