blob: baa87e5d44879ab6c6ab4f74be03664158767ea4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package router
import (
"fmt"
"net"
)
import (
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
import (
registryv1alpha1 "github.com/apache/dubbo-kubernetes/pkg/bufman/gen/proto/go/registry/v1alpha1"
"github.com/apache/dubbo-kubernetes/pkg/bufman/handlers/grpc_handlers"
"github.com/apache/dubbo-kubernetes/pkg/bufman/interceptors"
dubbo_cp "github.com/apache/dubbo-kubernetes/pkg/config/app/dubbo-cp"
"github.com/apache/dubbo-kubernetes/pkg/core/logger"
)
type GRPCRouter struct {
PlainServer *grpc.Server
PlainServerPort int
SecureServer *grpc.Server
SecureServerPort int
}
type GrpcServer struct {
PlainServer *grpc.Server
PlainServerPort int
SecureServer *grpc.Server
SecureServerPort int
}
func newGrpcServer(config dubbo_cp.Config) *GRPCRouter {
router := &GRPCRouter{
PlainServerPort: config.Bufman.Server.GrpcPlainPort,
SecureServerPort: config.Bufman.Server.GrpcSecurePort,
}
router.PlainServer = grpc.NewServer(grpc.ChainUnaryInterceptor(interceptors.Auth()))
reflection.Register(router.PlainServer)
router.SecureServer = grpc.NewServer(grpc.ChainUnaryInterceptor(interceptors.Auth()))
reflection.Register(router.SecureServer)
return router
}
func (r *GRPCRouter) NeedLeaderElection() bool {
return false
}
func (r *GRPCRouter) Start(stop <-chan struct{}) error {
plainLis, err := net.Listen("tcp", fmt.Sprintf(":%d", r.PlainServerPort))
if err != nil {
return err
}
secureLis, err := net.Listen("tcp", fmt.Sprintf(":%d", r.SecureServerPort))
if err != nil {
return err
}
plainErrChan := make(chan error)
secureErrChan := make(chan error)
go func() {
defer close(plainErrChan)
if err = r.PlainServer.Serve(plainLis); err != nil {
logger.Sugar().Error(err, "bufman terminated with an error")
plainErrChan <- err
} else {
logger.Sugar().Info("bufman terminated normally")
}
}()
go func() {
defer close(secureErrChan)
if err = r.SecureServer.Serve(secureLis); err != nil {
logger.Sugar().Error(err, "bufman terminated with an error")
secureErrChan <- err
} else {
logger.Sugar().Info("terminated normally")
}
}()
select {
case <-stop:
logger.Sugar().Info("bufman stopping gracefully")
r.PlainServer.GracefulStop()
r.SecureServer.GracefulStop()
return nil
case err := <-secureErrChan:
return err
case err := <-plainErrChan:
return err
}
}
func InitGRPCRouter(config dubbo_cp.Config) *GRPCRouter {
r := newGrpcServer(config)
register(r.PlainServer)
register(r.SecureServer)
return r
}
func register(server *grpc.Server) {
// UserService
registryv1alpha1.RegisterUserServiceServer(server, grpc_handlers.NewUserServiceHandler())
// TokenService
registryv1alpha1.RegisterTokenServiceServer(server, grpc_handlers.NewTokenServiceHandler())
// AuthnService
registryv1alpha1.RegisterAuthnServiceServer(server, grpc_handlers.NewAuthnServiceHandler())
// RepositoryService
registryv1alpha1.RegisterRepositoryServiceServer(server, grpc_handlers.NewRepositoryServiceHandler())
// PushService
registryv1alpha1.RegisterPushServiceServer(server, grpc_handlers.NewPushServiceHandler())
// CommitService
registryv1alpha1.RegisterRepositoryCommitServiceServer(server, grpc_handlers.NewCommitServiceHandler())
// TagService
registryv1alpha1.RegisterRepositoryTagServiceServer(server, grpc_handlers.NewTagServiceHandler())
// ResolveService
registryv1alpha1.RegisterResolveServiceServer(server, grpc_handlers.NewResolveServiceHandler())
// DownloadService
registryv1alpha1.RegisterDownloadServiceServer(server, grpc_handlers.NewDownloadServiceHandler())
// DocService
registryv1alpha1.RegisterDocServiceServer(server, grpc_handlers.NewDocServiceHandler())
}