| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package dubbo |
| |
| import ( |
| "errors" |
| "sync" |
| ) |
| |
| import ( |
| "github.com/dubbogo/gost/log/logger" |
| ) |
| |
| import ( |
| "dubbo.apache.org/dubbo-go/v3/client" |
| "dubbo.apache.org/dubbo-go/v3/common" |
| "dubbo.apache.org/dubbo-go/v3/server" |
| ) |
| |
| var ( |
| consumerServices = map[string]*client.ClientDefinition{} |
| conLock sync.RWMutex |
| providerServices = map[string]*server.ServiceDefinition{} |
| proLock sync.RWMutex |
| startOnce sync.Once |
| ) |
| |
| // Instance is the highest layer conception that user could touch. It is mapped from RootConfig. |
| // When users want to inject global configurations and configure common modules for client layer |
| // and server layer, user-side code would be like this: |
| // |
| // ins, err := NewInstance() |
| // cli, err := ins.NewClient() |
| type Instance struct { |
| insOpts *InstanceOptions |
| } |
| |
| // NewInstance receives InstanceOption and initializes RootConfig. There are some processing |
| // tasks during initialization. |
| func NewInstance(opts ...InstanceOption) (*Instance, error) { |
| newInsOpts := defaultInstanceOptions() |
| if err := newInsOpts.init(opts...); err != nil { |
| return nil, err |
| } |
| |
| return &Instance{insOpts: newInsOpts}, nil |
| } |
| |
| // NewClient is like client.NewClient, but inject configurations from RootConfig and |
| // ConsumerConfig |
| func (ins *Instance) NewClient(opts ...client.ClientOption) (*client.Client, error) { |
| if ins == nil || ins.insOpts == nil { |
| return nil, errors.New("Instance has not been initialized") |
| } |
| |
| var cliOpts []client.ClientOption |
| conCfg := ins.insOpts.CloneConsumer() |
| appCfg := ins.insOpts.CloneApplication() |
| regsCfg := ins.insOpts.CloneRegistries() |
| sdCfg := ins.insOpts.CloneShutdown() |
| metricsCfg := ins.insOpts.CloneMetrics() |
| otelCfg := ins.insOpts.CloneOtel() |
| tlsCfg := ins.insOpts.CloneTLSConfig() |
| protocolsCfg := ins.insOpts.CloneProtocols() |
| routersCfg := ins.insOpts.CloneRouter() |
| |
| if conCfg != nil { |
| if !conCfg.Check { |
| cliOpts = append(cliOpts, client.WithClientNoCheck()) |
| } |
| // these options come from Consumer and Root. |
| // for dubbo-go developers, referring config/ConsumerConfig.Init and config/ReferenceConfig |
| cliOpts = append(cliOpts, |
| client.WithClientFilter(conCfg.Filter), |
| // todo(DMwangnima): deal with Protocol |
| client.WithClientRegistryIDs(conCfg.RegistryIDs...), |
| // todo(DMwangnima): deal with TracingKey |
| client.SetClientConsumer(conCfg), |
| ) |
| } |
| if appCfg != nil { |
| cliOpts = append(cliOpts, client.SetClientApplication(appCfg)) |
| } |
| if regsCfg != nil { |
| cliOpts = append(cliOpts, client.SetClientRegistries(regsCfg)) |
| } |
| if sdCfg != nil { |
| cliOpts = append(cliOpts, client.SetClientShutdown(sdCfg)) |
| } |
| if metricsCfg != nil { |
| cliOpts = append(cliOpts, client.SetClientMetrics(metricsCfg)) |
| } |
| if otelCfg != nil { |
| cliOpts = append(cliOpts, client.SetClientOtel(otelCfg)) |
| } |
| if tlsCfg != nil { |
| cliOpts = append(cliOpts, client.SetClientTLS(tlsCfg)) |
| } |
| if protocolsCfg != nil { |
| cliOpts = append(cliOpts, client.SetClientProtocols(protocolsCfg)) |
| } |
| if routersCfg != nil { |
| cliOpts = append(cliOpts, client.SetClientRouters(routersCfg)) |
| } |
| |
| // options passed by users has higher priority |
| cliOpts = append(cliOpts, opts...) |
| cli, err := client.NewClient(cliOpts...) |
| if err != nil { |
| return nil, err |
| } |
| |
| return cli, nil |
| } |
| |
| // NewServer is like server.NewServer, but inject configurations from RootConfig. |
| func (ins *Instance) NewServer(opts ...server.ServerOption) (*server.Server, error) { |
| if ins == nil || ins.insOpts == nil { |
| return nil, errors.New("Instance has not been initialized") |
| } |
| |
| var srvOpts []server.ServerOption |
| appCfg := ins.insOpts.CloneApplication() |
| regsCfg := ins.insOpts.CloneRegistries() |
| prosCfg := ins.insOpts.CloneProtocols() |
| sdCfg := ins.insOpts.CloneShutdown() |
| metricsCfg := ins.insOpts.CloneMetrics() |
| otelCfg := ins.insOpts.CloneOtel() |
| tlsCfg := ins.insOpts.CloneTLSConfig() |
| protocolsCfg := ins.insOpts.CloneProtocols() |
| |
| if appCfg != nil { |
| srvOpts = append(srvOpts, |
| server.SetServerApplication(appCfg), |
| //server.WithServer_ApplicationConfig( |
| // global.WithApplication_Name(appCfg.Name), |
| // global.WithApplication_Organization(appCfg.Organization), |
| // global.WithApplication_Module(appCfg.Module), |
| // global.WithApplication_Version(appCfg.Version), |
| // global.WithApplication_Owner(appCfg.Owner), |
| // global.WithApplication_Environment(appCfg.Environment), |
| //), |
| ) |
| } |
| if regsCfg != nil { |
| srvOpts = append(srvOpts, server.SetServerRegistries(regsCfg)) |
| } |
| if prosCfg != nil { |
| srvOpts = append(srvOpts, server.SetServerProtocols(prosCfg)) |
| } |
| if sdCfg != nil { |
| srvOpts = append(srvOpts, server.SetServerShutdown(sdCfg)) |
| } |
| if metricsCfg != nil { |
| srvOpts = append(srvOpts, server.SetServerMetrics(metricsCfg)) |
| } |
| if otelCfg != nil { |
| srvOpts = append(srvOpts, server.SetServerOtel(otelCfg)) |
| } |
| if tlsCfg != nil { |
| srvOpts = append(srvOpts, server.SetServerTLS(tlsCfg)) |
| } |
| if protocolsCfg != nil { |
| srvOpts = append(srvOpts, server.SetServerProtocols(protocolsCfg)) |
| } |
| |
| // options passed by users have higher priority |
| srvOpts = append(srvOpts, opts...) |
| |
| srv, err := server.NewServer(srvOpts...) |
| |
| if err != nil { |
| return nil, err |
| } |
| return srv, nil |
| } |
| |
| func (ins *Instance) start() (err error) { |
| startOnce.Do(func() { |
| if err = ins.loadConsumer(); err != nil { |
| return |
| } |
| if err = ins.loadProvider(); err != nil { |
| return |
| } |
| }) |
| return err |
| } |
| |
| // loadProvider loads and initializes the service provider |
| // Flow: |
| // 1. Configure server options with Provider settings if available |
| // 2. Create server instance with configured options |
| // 3. Register all defined provider services (both IDL and non-IDL modes) |
| // 4. Start server in a separate goroutine to handle incoming requests |
| func (ins *Instance) loadProvider() error { |
| var err error |
| var srvOpts []server.ServerOption |
| // Step 1: Build server options - add Provider configuration if exists |
| if ins.insOpts.Provider != nil { |
| srvOpts = append(srvOpts, server.SetServerProvider(ins.insOpts.Provider)) |
| } |
| srv, err := ins.NewServer(srvOpts...) |
| if err != nil { |
| return err |
| } |
| // Step 2: Register all defined provider services |
| proLock.RLock() |
| defer proLock.RUnlock() |
| for _, definition := range providerServices { |
| // Step 3: Register service based on mode |
| if definition.Info != nil { |
| // IDL mode: Register with service information (interface definition available) |
| err = srv.Register(definition.Handler, definition.Info, definition.Opts...) |
| } else { |
| // Non-IDL mode: Register service handler directly (no interface definition) |
| err = srv.RegisterService(definition.Handler, definition.Opts...) |
| } |
| if err != nil { |
| return err |
| } |
| } |
| // Step 4: Start server |
| go func() { |
| if err = srv.Serve(); err != nil { |
| logger.Fatalf("Failed to start server, err: %v", err) |
| } |
| }() |
| return nil |
| } |
| |
| // loadConsumer loads the service consumer. |
| func (ins *Instance) loadConsumer() error { |
| cli, err := ins.NewClient() |
| if err != nil { |
| return err |
| } |
| // refer services |
| conLock.RLock() |
| defer conLock.RUnlock() |
| for intfName, definition := range consumerServices { |
| var ( |
| conn *client.Connection |
| dialErr error |
| ) |
| if definition.Info != nil { |
| conn, dialErr = cli.DialWithDefinition(intfName, definition) |
| definition.Info.ConnectionInjectFunc(definition.Svc, conn) |
| } else { |
| // default use msgpack |
| conn, dialErr = cli.NewService(definition.Svc) |
| consumerServices[common.GetReference(definition.Svc)].SetConnection(conn) |
| } |
| if dialErr != nil { |
| return dialErr |
| } |
| } |
| return nil |
| } |
| |
| // SetConsumerServiceWithInfo sets the consumer service with the client information. |
| func SetConsumerServiceWithInfo(svc common.RPCService, info *client.ClientInfo) { |
| conLock.Lock() |
| defer conLock.Unlock() |
| consumerServices[info.InterfaceName] = &client.ClientDefinition{ |
| Svc: svc, |
| Info: info, |
| } |
| } |
| |
| // SetProviderServiceWithInfo sets the provider service with the server information. |
| func SetProviderServiceWithInfo(svc common.RPCService, info *common.ServiceInfo) { |
| proLock.Lock() |
| defer proLock.Unlock() |
| providerServices[info.InterfaceName] = &server.ServiceDefinition{ |
| Handler: svc, |
| Info: info, |
| } |
| } |
| |
| func SetConsumerService(svc common.RPCService) { |
| conLock.Lock() |
| defer conLock.Unlock() |
| consumerServices[common.GetReference(svc)] = &client.ClientDefinition{ |
| Svc: svc, |
| } |
| } |
| |
| func SetProviderService(svc common.RPCService) { |
| proLock.Lock() |
| defer proLock.Unlock() |
| providerServices[common.GetReference(svc)] = &server.ServiceDefinition{ |
| Handler: svc, |
| } |
| } |
| |
| func GetConsumerConnection(interfaceName string) (*client.Connection, error) { |
| conLock.RLock() |
| defer conLock.RUnlock() |
| return consumerServices[interfaceName].GetConnection() |
| } |