| // Licensed to Apache Software Foundation (ASF) under one or more contributor |
| // license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright |
| // ownership. Apache Software Foundation (ASF) licenses this file to you under |
| // the Apache License, Version 2.0 (the "License"); you may |
| // not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package boot |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "os" |
| "os/signal" |
| "reflect" |
| "sync" |
| "syscall" |
| "time" |
| |
| "github.com/sirupsen/logrus" |
| |
| "github.com/apache/skywalking-satellite/internal/pkg/log" |
| "github.com/apache/skywalking-satellite/internal/satellite/config" |
| "github.com/apache/skywalking-satellite/internal/satellite/module/api" |
| "github.com/apache/skywalking-satellite/internal/satellite/module/gatherer" |
| "github.com/apache/skywalking-satellite/internal/satellite/module/processor" |
| "github.com/apache/skywalking-satellite/internal/satellite/module/sender" |
| "github.com/apache/skywalking-satellite/internal/satellite/sharing" |
| "github.com/apache/skywalking-satellite/internal/satellite/telemetry" |
| "github.com/apache/skywalking-satellite/plugins" |
| ) |
| |
| // ModuleContainer contains the every running module in each namespace. |
| type ModuleContainer map[string][]api.Module |
| |
| // Start Satellite. |
| func Start(cfg *config.SatelliteConfig, shutdownHookTime time.Duration) error { |
| // Init the global components. |
| log.Init(cfg.Logger) |
| telemetry.Init(cfg.Telemetry) |
| api.ShutdownHookTime = shutdownHookTime |
| // register the supported plugin types to the registry |
| plugins.RegisterPlugins() |
| // use context to receive the external signal. |
| ctx, cancel := context.WithCancel(context.Background()) |
| addShutdownListener(cancel) |
| // initialize the sharing plugins |
| sharing.Load(cfg.Sharing) |
| if err := sharing.Prepare(); err != nil { |
| return fmt.Errorf("error in preparing the sharing plugins: %v", err) |
| } |
| defer sharing.Close() |
| // boot Satellite |
| if modules, err := initModules(cfg); err != nil { |
| return err |
| } else if err := prepareModules(modules); err != nil { |
| return err |
| } else if err := sharing.Start(); err != nil { |
| return err |
| } else { |
| bootModules(ctx, modules) |
| return nil |
| } |
| } |
| |
| // addShutdownListener add a close signal listener. |
| func addShutdownListener(cancel context.CancelFunc) { |
| signals := make(chan os.Signal, 1) |
| signal.Notify(signals, os.Interrupt, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT) |
| go func() { |
| <-signals |
| cancel() |
| }() |
| } |
| |
| // initModules init the modules and register the modules to the module container. |
| func initModules(cfg *config.SatelliteConfig) (ModuleContainer, error) { |
| log.Logger.Infof("satellite is initializing...") |
| for _, aCfg := range cfg.Pipes { |
| if aCfg.Gatherer == nil || aCfg.Sender == nil || aCfg.Processor == nil { |
| return nil, errors.New("gatherer, sender, and processor is required in the namespace config") |
| } |
| } |
| // container contains the modules in each namespace. |
| container := make(ModuleContainer) |
| for _, aCfg := range cfg.Pipes { |
| // the added sequence should follow gather, sender and processor to purpose the booting sequence. |
| var modules []api.Module |
| g := gatherer.NewGatherer(aCfg.Gatherer) |
| s := sender.NewSender(aCfg.Sender) |
| p := processor.NewProcessor(aCfg.Processor) |
| if err := g.SetProcessor(p); err != nil { |
| return nil, err |
| } |
| if err := p.SetGatherer(g); err != nil { |
| return nil, err |
| } |
| if err := p.SetSender(s); err != nil { |
| return nil, err |
| } |
| if err := s.SetGatherer(g); err != nil { |
| return nil, err |
| } |
| modules = append(modules, g, s, p) |
| container[aCfg.PipeCommonConfig.PipeName] = modules |
| } |
| return container, nil |
| } |
| |
| // prepareModules makes that all modules are in a bootable state. |
| func prepareModules(container ModuleContainer) error { |
| log.Logger.Infof("satellite is prepare to start...") |
| var preparedModules []api.Module |
| for ns, modules := range container { |
| for _, m := range modules { |
| preparedModules = append(preparedModules, m) |
| if err := m.Prepare(); err != nil { |
| for _, preparedModule := range preparedModules { |
| preparedModule.Shutdown() |
| } |
| log.Logger.WithFields(logrus.Fields{ |
| "pipe": ns, |
| "module": reflect.TypeOf(m).String(), |
| }).Errorf("error in preparing stage: %v", err) |
| return err |
| } |
| } |
| } |
| return nil |
| } |
| |
| // bootModules boot all modules. |
| func bootModules(ctx context.Context, container ModuleContainer) { |
| log.Logger.Infof("satellite is starting...") |
| var wg sync.WaitGroup |
| for _, modules := range container { |
| wg.Add(len(modules)) |
| for _, m := range modules { |
| m := m |
| go func() { |
| defer wg.Done() |
| m.Boot(ctx) |
| }() |
| } |
| } |
| wg.Wait() |
| } |