blob: 1826d2b424eb25dca1df6c15c07288130e4aab2d [file] [log] [blame]
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package run
import (
"fmt"
"os"
"path"
"sync"
"github.com/oklog/run"
"github.com/pkg/errors"
"github.com/spf13/pflag"
"go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/pkg/config"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
// FlagSet holds a pflag.FlagSet as well as an exported Name variable for
// allowing improved help usage information.
type FlagSet struct {
*pflag.FlagSet
Name string
}
// NewFlagSet returns a new FlagSet for usage in Config objects.
func NewFlagSet(name string) *FlagSet {
return &FlagSet{
FlagSet: pflag.NewFlagSet(name, pflag.ContinueOnError),
Name: name,
}
}
// Unit is the default interface an object needs to implement for it to be able
// to register with a Group.
// Name should return a short but good identifier of the Unit.
type Unit interface {
Name() string
}
// Config interface should be implemented by Group Unit objects that manage
// their own configuration through the use of flags.
// If a Unit's Validate returns an error it will stop the Group immediately.
type Config interface {
// Unit for Group registration and identification
Unit
// FlagSet returns an object's FlagSet
FlagSet() *FlagSet
// Validate checks an object's stored values
Validate() error
}
// PreRunner interface should be implemented by Group Unit objects that need
// a pre run stage before starting the Group Services.
// If a Unit's PreRun returns an error it will stop the Group immediately.
type PreRunner interface {
// Unit for Group registration and identification
Unit
PreRun() error
}
// NewPreRunner takes a name and a standalone pre runner compatible function
// and turns them into a Group compatible PreRunner, ready for registration.
func NewPreRunner(name string, fn func() error) PreRunner {
return preRunner{name: name, fn: fn}
}
type preRunner struct {
name string
fn func() error
}
func (p preRunner) Name() string {
return p.name
}
func (p preRunner) PreRun() error {
return p.fn()
}
type StopNotify <-chan struct{}
// Service interface should be implemented by Group Unit objects that need
// to run a blocking service until an error occurs or a shutdown request is
// made.
// The Serve method must be blocking and return an error on unexpected shutdown.
// Recoverable errors need to be handled inside the service itself.
// GracefulStop must gracefully stop the service and make the Serve call return.
//
// Since Service is managed by Group, it is considered a design flaw to call any
// of the Service methods directly in application code.
type Service interface {
// Unit for Group registration and identification
Unit
// Serve starts the GroupService and blocks.
Serve() StopNotify
// GracefulStop shuts down and cleans up the GroupService.
GracefulStop()
}
// Group builds on https://github.com/oklog/run to provide a deterministic way
// to manage service lifecycles. It allows for easy composition of elegant
// monoliths as well as adding signal handlers, metrics services, etc.
type Group struct {
name string
f *FlagSet
r run.Group
c []Config
p []PreRunner
s []Service
readyCh chan struct{}
log *logger.Logger
showRunGroup bool
configured bool
}
func NewGroup(name string) Group {
return Group{
name: name,
readyCh: make(chan struct{}),
}
}
// Name shows the name of the group
func (g Group) Name() string {
return g.name
}
// Register will inspect the provided objects implementing the Unit interface to
// see if it needs to register the objects for any of the Group bootstrap
// phases. If a Unit doesn't satisfy any of the bootstrap phases it is ignored
// by Group.
// The returned array of booleans is of the same size as the amount of provided
// Units, signalling for each provided Unit if it successfully registered with
// Group for at least one of the bootstrap phases or if it was ignored.
func (g *Group) Register(units ...Unit) []bool {
g.log = logger.GetLogger(g.name)
hasRegistered := make([]bool, len(units))
for idx := range units {
if !g.configured {
// if RunConfig has been called we can no longer register Config
// phases of Units
if c, ok := units[idx].(Config); ok {
g.c = append(g.c, c)
hasRegistered[idx] = true
}
}
if p, ok := units[idx].(PreRunner); ok {
g.p = append(g.p, p)
hasRegistered[idx] = true
}
if s, ok := units[idx].(Service); ok {
g.s = append(g.s, s)
hasRegistered[idx] = true
}
}
return hasRegistered
}
func (g *Group) RegisterFlags() *FlagSet {
// run configuration stage
g.f = NewFlagSet(g.name)
g.f.SortFlags = false // keep order of flag registration
g.f.Usage = func() {
fmt.Printf("Flags:\n")
g.f.PrintDefaults()
}
gFS := NewFlagSet("Common Service options")
gFS.SortFlags = false
gFS.StringVarP(&g.name, "name", "n", g.name, `name of this service`)
gFS.BoolVar(&g.showRunGroup, "show-rungroup-units", false, "show rungroup units")
g.f.AddFlagSet(gFS.FlagSet)
// register flags from attached Config objects
fs := make([]*FlagSet, len(g.c))
for idx := range g.c {
// a Namer might have been deregistered
if g.c[idx] == nil {
continue
}
g.log.Debug().Str("name", g.c[idx].Name()).Uint32("registered", uint32(idx+1)).Uint32("total", uint32(len(g.c))).Msg("register flags")
fs[idx] = g.c[idx].FlagSet()
if fs[idx] == nil {
// no FlagSet returned
g.log.Debug().Str("name", g.c[idx].Name()).Msg("config object did not return a flagset")
continue
}
fs[idx].VisitAll(func(f *pflag.Flag) {
if g.f.Lookup(f.Name) != nil {
// log duplicate flag
g.log.Warn().Str("name", f.Name).Uint32("registered", uint32(idx+1)).Msg("ignoring duplicate flag")
return
}
g.f.AddFlag(f)
})
}
return g.f
}
// RunConfig runs the Config phase of all registered Config aware Units.
// Only use this function if needing to add additional wiring between config
// and (pre)run phases and a separate PreRunner phase is not an option.
// In most cases it is best to use the Run method directly as it will run the
// Config phase prior to executing the PreRunner and Service phases.
// If an error is returned the application must shut down as it is considered
// fatal.
func (g *Group) RunConfig() (interrupted bool, err error) {
g.log = logger.GetLogger(g.name)
g.configured = true
if g.name == "" {
// use the binary name if custom name has not been provided
g.name = path.Base(os.Args[0])
}
defer func() {
if err != nil {
g.log.Error().Err(err).Msg("unexpected exit")
}
}()
// Load config from env and file
if err = config.Load(g.f.Name, g.f.FlagSet); err != nil {
return false, errors.Wrapf(err, "%s fails to load config", g.f.Name)
}
// bail early on help or version requests
switch {
case g.showRunGroup:
fmt.Println(g.ListUnits())
return true, nil
}
// Validate Config inputs
for idx := range g.c {
// a Config might have been deregistered during Run
if g.c[idx] == nil {
g.log.Debug().Uint32("ran", uint32(idx+1)).Msg("skipping validate")
continue
}
g.log.Debug().Str("name", g.c[idx].Name()).Uint32("ran", uint32(idx+1)).Uint32("total", uint32(len(g.c))).Msg("validate config")
if vErr := g.c[idx].Validate(); vErr != nil {
err = multierr.Append(err, vErr)
}
}
// exit on at least one Validate error
if err != nil {
return false, err
}
// log binary name and version
g.log.Info().Msg("started")
return false, nil
}
// Run will execute all phases of all registered Units and block until an error
// occurs.
// If RunConfig has been called prior to Run, the Group's Config phase will be
// skipped and Run continues with the PreRunner and Service phases.
//
// The following phases are executed in the following sequence:
//
// Config phase (serially, in order of Unit registration)
// - FlagSet() Get & register all FlagSets from Config Units.
// - Flag Parsing Using the provided args (os.Args if empty)
// - Validate() Validate Config Units. Exit on first error.
//
// PreRunner phase (serially, in order of Unit registration)
// - PreRun() Execute PreRunner Units. Exit on first error.
//
// Service phase (concurrently)
// - Serve() Execute all Service Units in separate Go routines.
// - Wait Block until one of the Serve() methods returns
// - GracefulStop() Call interrupt handlers of all Service Units.
//
// Run will return with the originating error on:
// - first Config.Validate() returning an error
// - first PreRunner.PreRun() returning an error
// - first Service.Serve() returning (error or nil)
func (g *Group) Run() (err error) {
// run config registration and flag parsing stages
if interrupted, errRun := g.RunConfig(); interrupted || errRun != nil {
return errRun
}
defer func() {
if err != nil {
g.log.Fatal().Err(err).Stack().Msg("unexpected exit")
}
}()
// execute pre run stage and exit on error
for idx := range g.p {
// a PreRunner might have been deregistered during Run
if g.p[idx] == nil {
continue
}
g.log.Debug().Uint32("ran", uint32(idx+1)).Uint32("total", uint32(len(g.p))).Str("name", g.p[idx].Name()).Msg("pre-run")
if err := g.p[idx].PreRun(); err != nil {
return err
}
}
swg := &sync.WaitGroup{}
swg.Add(len(g.s))
go func() {
swg.Wait()
close(g.readyCh)
}()
// feed our registered services to our internal run.Group
for idx := range g.s {
// a Service might have been deregistered during Run
s := g.s[idx]
if s == nil {
continue
}
g.log.Debug().Uint32("total", uint32(len(g.s))).Uint32("ran", uint32(idx+1)).Str("name", s.Name()).Msg("serve")
g.r.Add(func() error {
notify := s.Serve()
swg.Done()
<-notify
return nil
}, func(_ error) {
g.log.Debug().Uint32("total", uint32(len(g.s))).Uint32("ran", uint32(idx+1)).Str("name", s.Name()).Msg("stop")
s.GracefulStop()
})
}
// start registered services and block
return g.r.Run()
}
// ListUnits returns a list of all Group phases and the Units registered to each
// of them.
func (g Group) ListUnits() string {
var (
s string
t = "cli"
)
if len(g.c) > 0 {
s += "\n- config: "
for _, u := range g.c {
if u != nil {
s += u.Name() + " "
}
}
}
if len(g.p) > 0 {
s += "\n- prerun: "
for _, u := range g.p {
if u != nil {
s += u.Name() + " "
}
}
}
if len(g.s) > 0 {
s += "\n- serve : "
for _, u := range g.s {
if u != nil {
t = "svc"
s += u.Name() + " "
}
}
}
return fmt.Sprintf("Group: %s [%s]%s", g.name, t, s)
}
func (g *Group) WaitTillReady() {
<-g.readyCh
}