blob: 353e5002b517dabdbde2c4afe5379ea2872b762f [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"sync"
)
import (
"istio.io/pkg/log"
)
type Component func(stop <-chan struct{}) error
// Instance is a server that is composed a number of Component tasks.
type Instance interface {
// Start this Server. Any components that were already added
// will be run immediately. If any error is returned,
// Start will terminate and return the error immediately.
//
// Once all startup components have been run, starts a polling
// loop to continue monitoring for new components and returns nil.
Start(stop <-chan struct{}) error
// RunComponent adds the given component to the server's run queue.
RunComponent(t Component)
// RunComponentAsync runs the given component asynchronously.
RunComponentAsync(t Component)
// RunComponentAsyncAndWait runs the given component asynchronously. When
// the serer Instance is shutting down, it will wait for the component
// to complete before exiting.
// Note: this is best effort; a process can die at any time.
RunComponentAsyncAndWait(t Component)
// Wait for this server Instance to shutdown.
Wait()
}
var _ Instance = &instance{}
// New creates a new server Instance.
func New() Instance {
return &instance{
done: make(chan struct{}),
components: make(chan Component, 1000), // should be enough?
}
}
type instance struct {
components chan Component
done chan struct{}
// requiredTerminations keeps track of tasks that should block instance exit
// if they are not stopped. This allows important cleanup tasks to be completed.
// Note: this is still best effort; a process can die at any time.
requiredTerminations sync.WaitGroup
}
func (i *instance) Start(stop <-chan struct{}) error {
shutdown := func() {
close(i.done)
}
// First, drain all startup tasks and immediately return if any fail.
for startupDone := false; !startupDone; {
select {
case next := <-i.components:
if err := next(stop); err != nil {
// Startup error: terminate and return the error.
shutdown()
return err
}
default:
// We've drained all of the initial tasks.
// Break out of the loop and run asynchronously.
startupDone = true
}
}
// Start the run loop to continue tasks added after the instance is started.
go func() {
for {
select {
case <-stop:
// Wait for any tasks that are required for termination.
i.requiredTerminations.Wait()
// Indicate that this instance is not terminated.
shutdown()
return
case next := <-i.components:
if err := next(stop); err != nil {
logComponentError(err)
}
}
}
}()
return nil
}
func (i *instance) RunComponent(t Component) {
select {
case <-i.done:
log.Warnf("attempting to run a new component after the server was shutdown")
default:
i.components <- t
}
}
func (i *instance) RunComponentAsync(task Component) {
i.RunComponent(func(stop <-chan struct{}) error {
go func() {
err := task(stop)
if err != nil {
logComponentError(err)
}
}()
return nil
})
}
func (i *instance) RunComponentAsyncAndWait(task Component) {
i.RunComponent(func(stop <-chan struct{}) error {
i.requiredTerminations.Add(1)
go func() {
err := task(stop)
if err != nil {
logComponentError(err)
}
i.requiredTerminations.Done()
}()
return nil
})
}
func (i *instance) Wait() {
<-i.done
}
func logComponentError(err error) {
log.Errorf("failure in server component: %v", err)
}