blob: 2e7fe2e8084a3d2741f6843264100877512824bf [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package envoy
import (
"context"
"errors"
"fmt"
"os"
"os/exec"
"sync"
"time"
)
import (
envoyAdmin "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
"istio.io/pkg/log"
)
const (
defaultName = "envoy"
defaultLiveTimeout = 20 * time.Second
)
// Config for an Envoy Instance.
type Config struct {
// Options provides the command-line options to be passed to Envoy.
Options Options
// Name of the envoy instance, used for logging only. If not provided, defaults to "envoy".
Name string
// BinaryPath the path to the Envoy binary.
BinaryPath string
// WorkingDir to be used when running Envoy. If not set, the current working directory is used.
WorkingDir string
// AdminPort specifies the administration port for the Envoy server. If not set, will
// be determined by parsing the Envoy bootstrap configuration file.
AdminPort uint32
// SkipBaseIDClose skips calling Close on the BaseID, if one was specified.
SkipBaseIDClose bool
}
// Waitable specifies a waitable operation.
type Waitable interface {
// WithTimeout specifies an upper bound on the wait time.
WithTimeout(timeout time.Duration) Waitable
// Do performs the wait. By default, waits indefinitely. To specify an upper bound on
// the wait time, use WithTimeout. If the wait times out, returns the last known error
// for retried operations or context.DeadlineExceeded if no previous error was encountered.
Do() error
}
// Instance of an Envoy process.
type Instance interface {
// Config returns the configuration for this Instance.
Config() Config
// BaseID used to start Envoy. If not set, returns InvalidBaseID.
BaseID() BaseID
// Epoch used to start Envoy. If it was not set, defaults to 0.
Epoch() Epoch
// Start the Envoy Instance. The process will be killed if the given context is canceled.
//
// If this Instance was created via NewInstanceForHotRestart, this method will block until the parent
// Instance terminates or goes "live". This is due to the fact that hot restart will fail if the previous
// envoy process is still initializing.
Start(ctx context.Context) Instance
// NewInstanceForHotRestart creates a new Envoy Instance that is configured for a hot restart of this
// Instance (i.e. epoch is incremented). During a hot restart of Envoy, the old process is drained and
// traffic is shifted over to the new process.
//
// The caller must Start the returned instance to initiate the hot restart.
//
// If a new Instance is successfully created, it assumes ownership of the Envoy shared memory segment
// used for hot restart. This means that when this Instance exits, it will no longer destroy
// the shared memory segment, regardless of the value of SkipBaseIDClose.
//
// If this Instance hasn't been started, calling this method does nothing and simply returns
// this Instance since there is nothing to restart.
//
// This method may only be called once on a given Instance. Subsequent calls will return an error.
NewInstanceForHotRestart() (Instance, error)
// WaitUntilLive polls the Envoy ServerInfo endpoint and waits for it to transition to "live". If the
// wait times out, returns the last known error or context.DeadlineExceeded if no error occurred within the
// specified duration.
WaitLive() Waitable
// AdminPort gets the administration port for Envoy.
AdminPort() uint32
// GetServerInfo returns a structure representing a call to /server_info
GetServerInfo() (*envoyAdmin.ServerInfo, error)
// GetConfigDump polls Envoy admin port for the config dump and returns the response.
GetConfigDump() (*envoyAdmin.ConfigDump, error)
// Wait for the Instance to terminate.
Wait() Waitable
// Kill the process, if running.
Kill() error
// KillAndWait is a helper that calls Kill and then waits for the process to terminate.
KillAndWait() Waitable
// Shutdown initiates the graceful termination of Envoy. Returns immediately and does not
// wait for the process to exit.
Shutdown() error
// ShutdownAndWait is a helper that calls Shutdown and waits for the process to terminate.
ShutdownAndWait() Waitable
// DrainListeners drains listeners of Envoy so that inflight requests
// can gracefully finish and even continue making outbound calls as needed.
DrainListeners() error
}
// FactoryFunc is a function that manufactures Envoy Instances.
type FactoryFunc func(cfg Config) (Instance, error)
var _ FactoryFunc = New
// New creates a new Envoy Instance with the given options.
func New(cfg Config) (Instance, error) {
if cfg.Name == "" {
cfg.Name = defaultName
}
// Process the binary path.
if cfg.BinaryPath == "" {
return nil, errors.New("must specify an Envoy binary")
}
// Create the config object from the options.
ctx := newConfigContext()
if err := cfg.Options.validate(ctx); err != nil {
return nil, err
}
// Extract the admin port from the configuration.
adminPort := cfg.AdminPort
if adminPort == 0 {
var err error
adminPort, err = ctx.getAdminPort()
if err != nil {
return nil, err
}
}
// Create a new command with the specified options.
args := cfg.Options.ToArgs()
cmd := exec.Command(cfg.BinaryPath, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if cfg.WorkingDir != "" {
cmd.Dir = cfg.WorkingDir
}
return &instance{
name: cfg.Name,
config: cfg,
cmd: cmd,
adminPort: adminPort,
baseID: ctx.baseID,
epoch: ctx.epoch,
waitCh: make(chan struct{}, 1),
}, nil
}
type instance struct {
config Config
name string
waitErr error
cmd *exec.Cmd
waitCh chan struct{}
adminPort uint32
baseID BaseID
epoch Epoch
started bool
hotRestart Instance
mux sync.Mutex
}
func (i *instance) Config() Config {
return i.config
}
func (i *instance) Start(ctx context.Context) Instance {
i.mux.Lock()
defer i.mux.Unlock()
log.Infof("%s starting with command: %v", i.logID(), i.cmd.Args)
// Make sure we haven't already started.
if i.started {
log.Infof("%s was already started, skipping Start", i.logID())
return i
}
i.started = true
// Start Envoy.
if err := i.cmd.Start(); err != nil {
i.waitErr = err
i.close()
return i
}
// Asynchronously wait for the command to terminate.
doneCh := make(chan error, 1)
go func() {
// Send the
doneCh <- i.cmd.Wait()
close(doneCh)
}()
go func() {
// Free all resources and close doneCh when we exit.
defer i.close()
select {
case <-ctx.Done():
log.Infof("%s Aborting: %v", i.logID(), ctx.Err())
i.waitErr = ctx.Err()
// Context aborted ... kill the process.
if err := i.Kill(); err != nil {
log.Warnf("%s kill failed: %v", i.logID(), err)
}
return
case i.waitErr = <-doneCh:
log.Infof("%s exited with error: %v", i.logID(), i.waitErr)
return
}
}()
return i
}
func (i *instance) NewInstanceForHotRestart() (Instance, error) {
i.mux.Lock()
defer i.mux.Unlock()
if i.hotRestart != nil {
return nil, fmt.Errorf("%s already created a hot restart Instance", i.logID())
}
if !i.started {
// This instance hasn't been started yet, no restart required.
return i, nil
}
// If this is a hot restart, wait for the parent process to be live before creating the new
// instance.
if err := i.WaitLive().WithTimeout(defaultLiveTimeout).Do(); err != nil {
log.Warnf("%s failed to go live: %v. Proceeding with hot restart",
i.logID(), err)
}
// Copy the configuration, but replace the epoch.
cfg := i.config
cfg.Options = make(Options, 0, len(cfg.Options))
for _, o := range i.config.Options {
if o.FlagName() != Epoch(0).FlagName() {
cfg.Options = append(cfg.Options, o)
}
}
// Increment the epoch on the new Instance.
cfg.Options = append(cfg.Options, i.epoch+1)
// Create the new instance.
hotRestart, err := New(cfg)
if err != nil {
return nil, err
}
i.hotRestart = hotRestart
return hotRestart, nil
}
func (i *instance) WaitLive() Waitable {
return &waitableImpl{
instance: i,
retryPeriod: 200 * time.Millisecond,
retryHandler: func() (bool, error) {
info, err := GetServerInfo(i.adminPort)
if err != nil {
return true, err
}
switch info.State {
case envoyAdmin.ServerInfo_LIVE:
// We're live!
return false, nil
case envoyAdmin.ServerInfo_DRAINING:
// Don't retry, it'll never happen.
return false, errors.New("envoy will never go live, it's already draining")
default:
// Retry.
return true, fmt.Errorf("envoy not live. Server State: %s", info.State)
}
},
}
}
func (i *instance) AdminPort() uint32 {
return i.adminPort
}
func (i *instance) BaseID() BaseID {
return i.baseID
}
func (i *instance) Epoch() Epoch {
return i.epoch
}
func (i *instance) GetServerInfo() (*envoyAdmin.ServerInfo, error) {
return GetServerInfo(i.adminPort)
}
func (i *instance) GetConfigDump() (*envoyAdmin.ConfigDump, error) {
return GetConfigDump(i.adminPort)
}
func (i *instance) Wait() Waitable {
return &waitableImpl{
instance: i,
}
}
func (i *instance) Kill() error {
if i.cmd.Process == nil {
return errors.New("envoy process was not started")
}
return i.cmd.Process.Kill()
}
func (i *instance) KillAndWait() Waitable {
return &waitableImpl{
instance: i,
creationErr: i.Kill(),
}
}
func (i *instance) Shutdown() error {
return Shutdown(i.adminPort)
}
func (i *instance) ShutdownAndWait() Waitable {
return &waitableImpl{
instance: i,
creationErr: i.Shutdown(),
}
}
func (i *instance) DrainListeners() error {
return DrainListeners(i.adminPort, true)
}
func (i *instance) close() {
// Delete the shared memory segment (used for hot restart) if configured to do and no
// further hot-restarts were initiated. If another restart was initiated, we hand off
// ownership of the shared memory to that Instance.
if !i.config.SkipBaseIDClose && i.hotRestart == nil {
if err := i.baseID.Close(); err != nil {
log.Infof("Failed freeing BaseID for %s: %v", i.logID(), err)
}
}
close(i.waitCh)
}
func (i *instance) logID() string {
return fmt.Sprintf("Envoy '%s' (epoch %d)", i.name, i.epoch)
}
var _ Waitable = &waitableImpl{}
type waitableImpl struct {
*instance
creationErr error
retryPeriod time.Duration
retryHandler func() (bool, error)
timeout time.Duration
}
func (w *waitableImpl) Do() error {
if w.creationErr != nil {
return w.creationErr
}
// Create a dummy time channel to be used if needed.
dummyTimeCh := make(chan time.Time, 1)
defer close(dummyTimeCh)
// Create the timeout channel
var timeoutCh <-chan time.Time
if w.timeout > 0 {
// Create a timer for the specified duration.
timer := time.NewTimer(w.timeout)
defer timer.Stop()
timeoutCh = timer.C
} else {
// No timeout was specified, just create dummy channel.
timeoutCh = dummyTimeCh
}
var retryCh <-chan time.Time
isRetrying := w.retryPeriod > 0
if isRetrying {
// Create a ticker for the specified duration.
ticker := time.NewTicker(w.retryPeriod)
defer ticker.Stop()
retryCh = ticker.C
} else {
// No ticker was specified, just use a dummy channel.
retryCh = dummyTimeCh
}
var lastErr error
for {
select {
case <-w.waitCh:
if w.waitErr != nil {
return w.waitErr
}
if lastErr != nil {
return lastErr
}
if isRetrying {
// Envoy exited before the retry operation was successful,
return errors.New("envoy process exited before wait completed")
}
return nil
case <-retryCh:
shouldRetry, err := w.retryHandler()
if !shouldRetry {
return err
}
// We're retrying, save the last error.
lastErr = err
case <-timeoutCh:
if lastErr != nil {
return lastErr
}
// The timeout occurred before any other events.
return context.DeadlineExceeded
}
}
}
func (w *waitableImpl) WithTimeout(d time.Duration) Waitable {
out := *w
out.timeout = d
return &out
}