blob: 00cc4dc353601fd2ff6b1f4736a5f7644e39a9ec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package grpc
import (
"crypto/tls"
"errors"
"fmt"
"github.com/go-mesh/mesher/common"
"github.com/go-mesh/mesher/resolver"
"net"
"strings"
chassisCom "github.com/go-chassis/go-chassis/core/common"
chassisConfig "github.com/go-chassis/go-chassis/core/config"
"github.com/go-chassis/go-chassis/core/lager"
"github.com/go-chassis/go-chassis/core/server"
chassisTLS "github.com/go-chassis/go-chassis/core/tls"
"github.com/go-mesh/mesher/pkg/runtime"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
const (
protocolName = "grpc"
)
func init() {
server.InstallPlugin(protocolName, newServer)
}
type grpcServer struct {
opts server.Options
server *grpc.Server
}
func newServer(opts server.Options) server.ProtocolServer {
return &grpcServer{
opts: opts,
}
}
func (hs *grpcServer) Register(schema interface{}, options ...server.RegisterOption) (string, error) {
return "", nil
}
func (hs *grpcServer) Start() error {
host, port, err := net.SplitHostPort(hs.opts.Address)
if err != nil {
return err
}
ip := net.ParseIP(host)
if ip == nil {
return fmt.Errorf("IP format error, input is [%s]", hs.opts.Address)
}
if ip.To4() == nil {
return fmt.Errorf("only support ipv4, input is [%s]", hs.opts.Address)
}
switch runtime.Mode {
case common.ModeSidecar:
err = hs.startSidecar(host, port)
case common.ModePerHost:
err = errors.New("do not support per host")
}
if err != nil {
return err
}
return nil
}
func (hs *grpcServer) startSidecar(host, port string) error {
mesherTLSConfig, mesherSSLConfig, mesherErr := chassisTLS.GetTLSConfigByService(
common.ComponentName, protocolName, chassisCom.Provider)
if mesherErr != nil {
if !chassisTLS.IsSSLConfigNotExist(mesherErr) {
return mesherErr
}
} else {
sslTag := genTag(common.ComponentName, chassisCom.Provider)
lager.Logger.Warnf("%s TLS mode, verify peer: %t, cipher plugin: %s.",
sslTag, mesherSSLConfig.VerifyPeer, mesherSSLConfig.CipherPlugin)
}
if err := hs.listenAndServe("127.0.0.1"+":"+port, mesherTLSConfig, LocalRequestHandler); err != nil {
return err
}
resolver.SelfEndpoint = "127.0.0.1" + ":" + port
switch host {
case "0.0.0.0":
return errors.New("in sidecar mode, forbidden to listen on 0.0.0.0")
case "127.0.0.1":
lager.Logger.Warnf("Mesher listen on 127.0.0.1, it can only proxy for consumer. " +
"for provider, mesher must listen on external ip.")
return nil
default:
serverTLSConfig, serverSSLConfig, serverErr := chassisTLS.GetTLSConfigByService(
chassisConfig.SelfServiceName, protocolName, chassisCom.Provider)
if serverErr != nil {
if !chassisTLS.IsSSLConfigNotExist(serverErr) {
return serverErr
}
} else {
sslTag := genTag(chassisConfig.SelfServiceName, chassisCom.ProtocolRest, chassisCom.Provider)
lager.Logger.Warnf("%s TLS mode, verify peer: %t, cipher plugin: %s.",
sslTag, serverSSLConfig.VerifyPeer, serverSSLConfig.CipherPlugin)
}
if err := hs.listenAndServe(hs.opts.Address, serverTLSConfig, RemoteRequestHandler); err != nil {
return err
}
}
return nil
}
func (hs *grpcServer) listenAndServe(addr string, tlsConfig *tls.Config, h grpc.StreamHandler) error {
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
go func() {
if tlsConfig != nil {
lager.Logger.Info(protocolName + " enable TLS and listen on " + addr)
hs.server = grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig)),
grpc.UnknownServiceHandler(h),
grpc.CustomCodec(&codec{}))
} else {
lager.Logger.Info(protocolName + " listen on " + addr)
hs.server = grpc.NewServer(grpc.UnknownServiceHandler(h), grpc.CustomCodec(&codec{}))
}
if err := hs.server.Serve(ln); err != nil {
server.ErrRuntime <- err
return
}
}()
return nil
}
func (hs *grpcServer) Stop() error {
//go 1.8+ drain connections before stop server
if hs.server == nil {
lager.Logger.Info("grpc server doesn't need to be stopped")
return nil
}
hs.server.GracefulStop()
lager.Logger.Info("grpc server gracefully stopped")
return nil
}
func (hs *grpcServer) String() string {
return protocolName
}
func genTag(s ...string) string {
return strings.Join(s, ".")
}