blob: 0262869cd95911cd38fca94a814b24c57043449d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package triple
import (
"context"
"reflect"
"sync"
"time"
)
import (
tripleConstant "github.com/dubbogo/triple/pkg/common/constant"
triConfig "github.com/dubbogo/triple/pkg/config"
"github.com/dubbogo/triple/pkg/triple"
"github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/filterchain"
"github.com/apache/dubbo-go-pixiu/pkg/listener"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pkg/model"
)
func init() {
listener.SetListenerServiceFactory(model.ProtocolTypeTriple, newTripleListenerService)
}
type (
// ListenerService the facade of a listener
TripleListenerService struct {
listener.BaseListenerService
server *triple.TripleServer
serviceMap *sync.Map
gShutdownConfig *listener.ListenerGracefulShutdownConfig
}
// ProxyService grpc proxy service definition
ProxyService struct {
reqTypeMap sync.Map
ls *TripleListenerService
}
)
func newTripleListenerService(lc *model.Listener, bs *model.Bootstrap) (listener.ListenerService, error) {
fc := filterchain.CreateNetworkFilterChain(lc.FilterChain)
ls := &TripleListenerService{
BaseListenerService: listener.BaseListenerService{
Config: lc,
FilterChain: fc,
},
gShutdownConfig: &listener.ListenerGracefulShutdownConfig{},
}
opts := []triConfig.OptionFunction{
triConfig.WithCodecType(tripleConstant.HessianCodecName),
triConfig.WithLocation(lc.Address.SocketAddress.GetAddress()),
triConfig.WithLogger(logger.GetLogger()),
triConfig.WithProxyModeEnable(true),
}
triOption := triConfig.NewTripleOption(opts...)
tripleService := &ProxyService{ls: ls}
serviceMap := &sync.Map{}
serviceMap.Store(tripleConstant.ProxyServiceKey, tripleService)
server := triple.NewTripleServer(serviceMap, triOption)
ls.serviceMap = serviceMap
ls.server = server
return ls, nil
}
// Start start triple server
func (ls *TripleListenerService) Start() error {
ls.server.Start()
return nil
}
func (ls *TripleListenerService) Close() error {
ls.server.Stop()
return nil
}
func (ls *TripleListenerService) ShutDown(wg interface{}) error {
timeout := config.GetBootstrap().GetShutdownConfig().GetTimeout()
if timeout <= 0 {
return nil
}
// stop accept request
ls.gShutdownConfig.RejectRequest = true
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) && ls.gShutdownConfig.ActiveCount > 0 {
// sleep 100 ms and check it again
time.Sleep(100 * time.Millisecond)
logger.Infof("waiting for active invocation count = %d", ls.gShutdownConfig.ActiveCount)
}
wg.(*sync.WaitGroup).Done()
ls.server.Stop()
return nil
}
func (ls *TripleListenerService) Refresh(c model.Listener) error {
// There is no need to lock here for now, as there is at most one NetworkFilter
fc := filterchain.CreateNetworkFilterChain(c.FilterChain)
ls.FilterChain = fc
return nil
}
// GetReqParamsInterfaces get params
func (d *ProxyService) GetReqParamsInterfaces(methodName string) ([]interface{}, bool) {
val, ok := d.reqTypeMap.Load(methodName)
if !ok {
return nil, false
}
typs := val.([]reflect.Type)
reqParamsInterfaces := make([]interface{}, 0, len(typs))
for _, typ := range typs {
reqParamsInterfaces = append(reqParamsInterfaces, reflect.New(typ).Interface())
}
return reqParamsInterfaces, true
}
// InvokeWithArgs called when rpc invocation comes
func (d *ProxyService) InvokeWithArgs(ctx context.Context, methodName string, arguments []interface{}) (interface{}, error) {
d.ls.gShutdownConfig.AddActiveCount(1)
defer d.ls.gShutdownConfig.AddActiveCount(-1)
if d.ls.gShutdownConfig.RejectRequest {
return nil, errors.Errorf("Pixiu is preparing to close, reject all new requests")
}
return d.ls.FilterChain.OnTripleData(ctx, methodName, arguments)
}