blob: f25d2783946ffdda470ee2b9a91e767265798e78 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package component
import (
"time"
)
import (
"github.com/go-logr/logr"
"github.com/pkg/errors"
)
const (
backoffTime = 5 * time.Second
)
type resilientComponent struct {
log logr.Logger
component Component
}
func NewResilientComponent(log logr.Logger, component Component) Component {
return &resilientComponent{
log: log,
component: component,
}
}
func (r *resilientComponent) Start(stop <-chan struct{}) error {
r.log.Info("starting resilient component ...")
for generationID := uint64(1); ; generationID++ {
errCh := make(chan error, 1)
go func(errCh chan<- error) {
defer close(errCh)
// recover from a panic
defer func() {
if e := recover(); e != nil {
if err, ok := e.(error); ok {
errCh <- errors.WithStack(err)
} else {
errCh <- errors.Errorf("%v", e)
}
}
}()
errCh <- r.component.Start(stop)
}(errCh)
select {
case <-stop:
r.log.Info("done")
return nil
case err := <-errCh:
if err != nil {
r.log.WithValues("generationID", generationID).Error(err, "component terminated with an error")
}
}
<-time.After(backoffTime)
}
}
func (r *resilientComponent) NeedLeaderElection() bool {
return r.component.NeedLeaderElection()
}