blob: eacb0c391dba29b2eb1d1fe3d913b16cb669926f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package server provides APIs for registering services and starting an RPC server.
package server
import (
"context"
"reflect"
"sort"
"strconv"
"sync"
)
import (
"github.com/dubbogo/gost/log/logger"
"github.com/pkg/errors"
)
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/dubboutil"
"dubbo.apache.org/dubbo-go/v3/metadata"
"dubbo.apache.org/dubbo-go/v3/metrics/probe"
"dubbo.apache.org/dubbo-go/v3/registry/exposed_tmp"
)
// proServices are for internal services
var internalProServices = make([]*InternalService, 0, 16)
var internalProLock sync.Mutex
type Server struct {
cfg *ServerOptions
mu sync.RWMutex
// key: *ServiceOptions, value: *common.ServiceInfo
//proServices map[string]common.RPCService
// change any to *common.ServiceInfo @see config/service.go
svcOptsMap map[string]*ServiceOptions
// key is interface name, value is *ServiceOptions
interfaceNameServices map[string]*ServiceOptions
// indicate whether the server is already started
serve bool
}
// ServiceInfo Deprecated: common.ServiceInfo type alias, just for compatible with old generate pb.go file
type ServiceInfo = common.ServiceInfo
// MethodInfo Deprecated: common.MethodInfo type alias, just for compatible with old generate pb.go file
type MethodInfo = common.MethodInfo
type ServiceDefinition struct {
Handler any
Info *common.ServiceInfo
Opts []ServiceOption
}
// Register assemble invoker chains like ProviderConfig.Load, init a service per call
func (s *Server) Register(handler any, info *common.ServiceInfo, opts ...ServiceOption) error {
return s.registerWithMode(handler, info, constant.IDL, opts...)
}
// RegisterService is for new Triple non-idl mode implement.
func (s *Server) RegisterService(handler any, opts ...ServiceOption) error {
return s.registerWithMode(handler, nil, constant.NONIDL, opts...)
}
// registerWithMode unified service registration logic
func (s *Server) registerWithMode(handler any, info *common.ServiceInfo, idlMode string, opts ...ServiceOption) error {
baseOpts := []ServiceOption{
WithIDLMode(idlMode),
}
// only need to explicitly set interface in NONIDL mode
if idlMode == constant.NONIDL {
baseOpts = append(baseOpts, WithInterface(common.GetReference(handler)))
}
baseOpts = append(baseOpts, opts...)
newSvcOpts, err := s.genSvcOpts(handler, info, baseOpts...)
if err != nil {
return err
}
s.registerServiceOptions(newSvcOpts)
return nil
}
func (s *Server) genSvcOpts(handler any, info *common.ServiceInfo, opts ...ServiceOption) (*ServiceOptions, error) {
if s.cfg == nil {
return nil, errors.New("Server has not been initialized, please use NewServer() to create Server")
}
var svcOpts []ServiceOption
appCfg := s.cfg.Application
proCfg := s.cfg.Provider
prosCfg := s.cfg.Protocols
regsCfg := s.cfg.Registries
// todo(DMwangnima): record the registered service
// Record the registered service for debugging and monitoring
interfaceName := common.GetReference(handler)
logger.Infof("Registering service: %s", interfaceName)
newSvcOpts := defaultServiceOptions()
if appCfg != nil {
svcOpts = append(svcOpts,
SetApplication(appCfg),
)
}
if proCfg != nil {
svcOpts = append(svcOpts,
SetProvider(proCfg),
)
}
if prosCfg != nil {
svcOpts = append(svcOpts,
SetProtocols(prosCfg),
)
}
if regsCfg != nil {
svcOpts = append(svcOpts,
SetRegistries(regsCfg),
)
}
// Get service-level configuration items from provider.services configuration
if proCfg != nil && proCfg.Services != nil {
// Get the unique identifier of the handler (the default is the structure name or the alias set during registration)
// Give priority to accurately finding the service configuration from the configuration based on the reference name (i.e. the handler registration name)
svcCfg, ok := proCfg.Services[interfaceName]
if !ok {
//fallback: traverse matching interface fields
for _, cfg := range proCfg.Services {
if cfg.Interface == interfaceName {
svcCfg = cfg
}
}
}
if svcCfg != nil {
svcOpts = append(svcOpts,
SetService(svcCfg),
)
logger.Infof("Injected options from provider.services for %s", interfaceName)
} else {
// Only warn if there are actually services configured but none match
// This avoids unnecessary warnings when using new server API without config files
if len(proCfg.Services) > 0 {
logger.Warnf("No matching service config found for [%s]", interfaceName)
}
}
}
// options passed by users have higher priority
svcOpts = append(svcOpts, opts...)
if err := newSvcOpts.init(s, svcOpts...); err != nil {
return nil, err
}
svcConf := newSvcOpts.Service
if info != nil {
if svcConf.Interface == "" {
svcConf.Interface = info.InterfaceName
}
newSvcOpts.info = info
}
newSvcOpts.Id = interfaceName
newSvcOpts.Implement(handler)
newSvcOpts.info = enhanceServiceInfo(info)
return newSvcOpts, nil
}
// isNillable checks if a reflect.Value's kind supports nil checking.
func isNillable(v reflect.Value) bool {
switch v.Kind() {
case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Pointer, reflect.Slice, reflect.UnsafePointer:
return true
default:
return false
}
}
// isReflectNil safely checks if a reflect.Value is nil.
func isReflectNil(v reflect.Value) bool {
return isNillable(v) && v.IsNil()
}
// CallMethodByReflection invokes the given method via reflection and processes its return values.
// This is a shared helper function used by both server/server.go and protocol/triple/server.go.
func CallMethodByReflection(ctx context.Context, method reflect.Method, handler any, args []any) (any, error) {
in := []reflect.Value{reflect.ValueOf(handler)}
in = append(in, reflect.ValueOf(ctx))
for _, arg := range args {
in = append(in, reflect.ValueOf(arg))
}
returnValues := method.Func.Call(in)
// Process return values
if len(returnValues) == 1 {
if isReflectNil(returnValues[0]) {
return nil, nil
}
if err, ok := returnValues[0].Interface().(error); ok {
return nil, err
}
return nil, nil
}
var result any
var err error
if !isReflectNil(returnValues[0]) {
result = returnValues[0].Interface()
}
if len(returnValues) > 1 && !isReflectNil(returnValues[1]) {
if e, ok := returnValues[1].Interface().(error); ok {
err = e
}
}
return result, err
}
// createReflectionMethodFunc creates a MethodFunc that calls the given method via reflection.
func createReflectionMethodFunc(method reflect.Method) func(ctx context.Context, args []any, handler any) (any, error) {
return func(ctx context.Context, args []any, handler any) (any, error) {
return CallMethodByReflection(ctx, method, handler, args)
}
}
// enhanceServiceInfo fills in missing MethodFunc entries via reflection.
// Case-insensitive Triple routing is handled in the transport-layer route mux,
// but lowercase-first ServiceInfo method names still need MethodFunc backfill so
// reflection-based invocation can reach the exported Go method.
func enhanceServiceInfo(info *common.ServiceInfo) *common.ServiceInfo {
if info == nil {
return info
}
var svcType reflect.Type
if info.ServiceType != nil {
svcType = reflect.TypeOf(info.ServiceType)
}
// Build method map for reflection lookup.
// Keep the first-rune-swapped alias for lowercase-first ServiceInfo names
// (for example "sayHello" -> "SayHello") without duplicating metadata.
methodMap := make(map[string]reflect.Method)
if svcType != nil {
for i := 0; i < svcType.NumMethod(); i++ {
m := svcType.Method(i)
methodMap[m.Name] = m
methodMap[dubboutil.SwapCaseFirstRune(m.Name)] = m
}
}
// Fill in MethodFunc for methods that don't already have one.
for i := range info.Methods {
if info.Methods[i].MethodFunc == nil && svcType != nil {
if reflectMethod, ok := methodMap[info.Methods[i].Name]; ok {
info.Methods[i].MethodFunc = createReflectionMethodFunc(reflectMethod)
}
}
}
return info
}
func (s *Server) exportServices() error {
// add read lock to protect svcOptsMap data
s.mu.RLock()
defer s.mu.RUnlock()
for _, svcOpts := range s.svcOptsMap {
if err := svcOpts.Export(); err != nil {
logger.Errorf("export %s service failed, err: %s", svcOpts.Service.Interface, err)
return errors.Wrapf(err, "failed to export service %s", svcOpts.Service.Interface)
}
}
return nil
}
func (s *Server) Serve() error {
s.mu.Lock()
if s.serve {
// release lock in case causing deadlock
s.mu.Unlock()
return errors.New("server has already been started")
}
// prevent multiple calls to Serve
s.serve = true
// release lock in case causing deadlock
s.mu.Unlock()
// the registryConfig in ServiceOptions and ServerOptions all need to init a metadataReporter,
// when ServiceOptions.init() is called we don't know if a new registry config is set in the future use serviceOption
if err := metadata.InitRegistryMetadataReport(s.cfg.Registries); err != nil {
return err
}
metadataOpts := metadata.NewOptions(
metadata.WithAppName(s.cfg.Application.Name),
metadata.WithMetadataType(s.cfg.Application.MetadataType),
metadata.WithPort(getMetadataPort(s.cfg)),
metadata.WithMetadataProtocol(s.cfg.Application.MetadataServiceProtocol),
)
if err := metadataOpts.Init(); err != nil {
return err
}
if err := s.exportServices(); err != nil {
return err
}
if err := s.exportInternalServices(); err != nil {
return err
}
if err := exposed_tmp.RegisterServiceInstance(); err != nil {
return err
}
// k8s probe ready
probe.SetStartupComplete(true)
probe.SetReady(true)
select {}
}
// In order to expose internal services
func (s *Server) exportInternalServices() error {
cfg := &ServiceOptions{}
cfg.Application = s.cfg.Application
cfg.Provider = s.cfg.Provider
cfg.Protocols = s.cfg.Protocols
cfg.Registries = s.cfg.Registries
services := make([]*InternalService, 0, len(internalProServices))
internalProLock.Lock()
defer internalProLock.Unlock()
for _, service := range internalProServices {
if service.Init == nil {
return errors.New("[internal service]internal service init func is empty, please set the init func correctly")
}
sd, ok := service.Init(cfg)
if !ok {
logger.Infof("[internal service]%s service will not expose", service.Name)
continue
}
newSvcOpts, err := s.genSvcOpts(sd.Handler, sd.Info, sd.Opts...)
if err != nil {
return err
}
service.svcOpts = newSvcOpts
service.info = sd.Info
services = append(services, service)
}
sort.Slice(services, func(i, j int) bool {
return services[i].Priority < services[j].Priority
})
for _, service := range services {
if service.BeforeExport != nil {
service.BeforeExport(service.svcOpts)
}
err := service.svcOpts.Export()
if service.AfterExport != nil {
service.AfterExport(service.svcOpts, err)
}
if err != nil {
logger.Errorf("[internal service]export %s service failed, err: %s", service.Name, err)
return err
}
}
return nil
}
// InternalService for dubbo internal services
type InternalService struct {
// This is required
// internal service name
Name string
svcOpts *ServiceOptions
info *common.ServiceInfo
// This is required
// This options is service configuration
// Return serviceDefinition and bool, where bool indicates whether it is exported
Init func(options *ServiceOptions) (*ServiceDefinition, bool)
// This options is InternalService.svcOpts itself
BeforeExport func(options *ServiceOptions)
// This options is InternalService.svcOpts itself
AfterExport func(options *ServiceOptions, err error)
// Priority of service exposure
// Lower numbers have the higher priority
// The default priority is 0
// The metadata service is exposed at the end
// If you have no requirements for the order of service exposure, you can use the default priority or not set
Priority int
}
func getMetadataPort(opts *ServerOptions) int {
port := opts.Application.MetadataServicePort
if port == "" {
protocolConfig, ok := opts.Protocols[constant.DefaultProtocol]
if ok {
port = protocolConfig.Port
}
}
if port == "" {
return 0
}
p, err := strconv.Atoi(port)
if err != nil {
logger.Error("MetadataService port parse error %v, MetadataService will use random port", err)
return 0
}
return p
}
func NewServer(opts ...ServerOption) (*Server, error) {
newSrvOpts := defaultServerOptions()
if err := newSrvOpts.init(opts...); err != nil {
return nil, err
}
srv := &Server{
cfg: newSrvOpts,
svcOptsMap: make(map[string]*ServiceOptions),
interfaceNameServices: make(map[string]*ServiceOptions),
}
return srv, nil
}
func SetProviderServices(sd *InternalService) {
if sd.Name == "" {
logger.Warnf("[internal service]internal name is empty, please set internal name")
return
}
internalProLock.Lock()
defer internalProLock.Unlock()
internalProServices = append(internalProServices, sd)
}
func (s *Server) registerServiceOptions(serviceOptions *ServiceOptions) {
s.mu.Lock()
defer s.mu.Unlock()
logger.Infof("A provider service %s was registered successfully.", serviceOptions.Id)
s.svcOptsMap[serviceOptions.Id] = serviceOptions
if serviceOptions.Service != nil && serviceOptions.Service.Interface != "" {
s.interfaceNameServices[serviceOptions.Service.Interface] = serviceOptions
}
}
// GetServiceOptions retrieves the ServiceOptions for a service by its name/ID
func (s *Server) GetServiceOptions(name string) *ServiceOptions {
s.mu.RLock()
defer s.mu.RUnlock()
return s.svcOptsMap[name]
}
// GetServiceInfo retrieves the ServiceInfo for a service by its name/ID
// Returns nil if the service is not found or has no ServiceInfo
func (s *Server) GetServiceInfo(name string) *common.ServiceInfo {
s.mu.RLock()
defer s.mu.RUnlock()
if svcOpts, ok := s.svcOptsMap[name]; ok {
return svcOpts.info
}
return nil
}
// GetRPCService retrieves the RPCService implementation for a service by its name/ID
// Returns nil if the service is not found or has no RPCService
func (s *Server) GetRPCService(name string) common.RPCService {
s.mu.RLock()
defer s.mu.RUnlock()
if svcOpts, ok := s.svcOptsMap[name]; ok {
return svcOpts.rpcService
}
return nil
}
// GetServiceOptionsByInterfaceName retrieves the ServiceOptions for a service by its interface name
// Returns nil if no service is found with the given interface name
func (s *Server) GetServiceOptionsByInterfaceName(interfaceName string) *ServiceOptions {
s.mu.RLock()
defer s.mu.RUnlock()
return s.interfaceNameServices[interfaceName]
}